[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704
 ] 

Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:27 AM:


[~rhauch] Sure, the workaround is clear, but the ticket is more about letting 
user heads up about the problem preventively.

For instance, by throwing an exception.

Is it possible to have feedback from the broker that this payload is too big 
and just stop working?


was (Author: olkuznsmith):
[~rhauch] Sure, workaround is clear, but the ticket is more about letting user 
heads up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704
 ] 

Oleg Kuznetsov commented on KAFKA-7593:
---

[~rhauch] Sure, workaround is clear, but the ticket is more about to let user 
head up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704
 ] 

Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:23 AM:


[~rhauch] Sure, workaround is clear, but the ticket is more about letting user 
heads up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?


was (Author: olkuznsmith):
[~rhauch] Sure, workaround is clear, but the ticket is more about to let user 
head up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException

2018-11-06 Thread Jin Tianfan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677601#comment-16677601
 ] 

Jin Tianfan commented on KAFKA-3980:


[~rsivaram]  Unfortunately,we did not open jmx port,so we can not analysis 
these metrics directly.but in the dump file.the find numerous string like 
"client-id=admin-\{anyNumber}" or "client-id=consumer-\{anyNumber}" .such as in 
the picture below:

!heap_img.png!

howerver,the dump file size is too big,I compressed it,and still 800M 。i dont 
konw how to share it.

> JmxReporter uses excessive memory causing OutOfMemoryException
> --
>
> Key: KAFKA-3980
> URL: https://issues.apache.org/jira/browse/KAFKA-3980
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.1
>Reporter: Andrew Jorgensen
>Priority: Major
> Attachments: heap_img.png
>
>
> I have some nodes in a kafka cluster that occasionally will run out of memory 
> whenever I restart the producers. I was able to take a heap dump from both a 
> recently restarted Kafka node which weighed in at about 20 MB and a node that 
> has been running for 2 months is using over 700MB of memory. Looking at the 
> heap dump it looks like the JmxReporter is holding on to metrics and causing 
> them to build up over time. 
> !http://imgur.com/N6Cd0Ku.png!
> !http://imgur.com/kQBqA2j.png!
> The ultimate problem this causes is that there is a chance when I restart the 
> producers it will cause the node to experience an Java heap space exception 
> and OOM. The nodes  then fail to startup correctly and write a -1 as the 
> leader number to the partitions they were responsible for effectively 
> resetting the offset and rendering that partition unavailable. The kafka 
> process then needs to go be restarted in order to re-assign the node to the 
> partition that it owns.
> I have a few questions:
> 1. I am not quite sure why there are so many client id entries in that 
> JmxReporter map.
> 2. Is there a way to have the JmxReporter release metrics after a set amount 
> of time or a way to turn certain high cardinality metrics like these off?
> I can provide any logs or heap dumps if more information is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException

2018-11-06 Thread Jin Tianfan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Tianfan updated KAFKA-3980:
---
Attachment: heap_img.png

> JmxReporter uses excessive memory causing OutOfMemoryException
> --
>
> Key: KAFKA-3980
> URL: https://issues.apache.org/jira/browse/KAFKA-3980
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.1
>Reporter: Andrew Jorgensen
>Priority: Major
> Attachments: heap_img.png
>
>
> I have some nodes in a kafka cluster that occasionally will run out of memory 
> whenever I restart the producers. I was able to take a heap dump from both a 
> recently restarted Kafka node which weighed in at about 20 MB and a node that 
> has been running for 2 months is using over 700MB of memory. Looking at the 
> heap dump it looks like the JmxReporter is holding on to metrics and causing 
> them to build up over time. 
> !http://imgur.com/N6Cd0Ku.png!
> !http://imgur.com/kQBqA2j.png!
> The ultimate problem this causes is that there is a chance when I restart the 
> producers it will cause the node to experience an Java heap space exception 
> and OOM. The nodes  then fail to startup correctly and write a -1 as the 
> leader number to the partitions they were responsible for effectively 
> resetting the offset and rendering that partition unavailable. The kafka 
> process then needs to go be restarted in order to re-assign the node to the 
> partition that it owns.
> I have a few questions:
> 1. I am not quite sure why there are so many client id entries in that 
> JmxReporter map.
> 2. Is there a way to have the JmxReporter release metrics after a set amount 
> of time or a way to turn certain high cardinality metrics like these off?
> I can provide any logs or heap dumps if more information is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7313) StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7313.
-
Resolution: Fixed

> StopReplicaRequest should attempt to remove future replica for the partition 
> only if future replica exists
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> This patch fixes two issues:
> 1) Currently if a broker received StopReplicaRequest with delete=true for the 
> same offline replica, the first StopRelicaRequest will show 
> KafkaStorageException and the second StopRelicaRequest will show 
> ReplicaNotAvailableException. This is because the first StopRelicaRequest 
> will remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
> ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
> second StopRelicaRequest will not find this partition as offline.
> This result appears to be inconsistent. And since the replica is already 
> offline and broker will not be able to delete file for this replica, the 
> StopReplicaRequest should fail without making any change and broker should 
> still remember that this replica is offline. 
> 2) Currently if broker receives StopReplicaRequest with delete=true, the 
> broker will attempt to remove future replica for the partition, which will 
> cause KafkaStorageException in the StopReplicaResponse if this replica does 
> not have future replica. It is problematic to always return 
> KafkaStorageException in the response if future replica does not exist.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7481:

Fix Version/s: (was: 2.1.0)
   2.2.0

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states

2018-11-06 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7537.

   Resolution: Fixed
Fix Version/s: 2.2.0

Merged the PR to trunk.

> Only include live brokers in the UpdateMetadataRequest sent to existing 
> brokers if there is no change in the partition states
> -
>
> Key: KAFKA-7537
> URL: https://issues.apache.org/jira/browse/KAFKA-7537
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.2.0
>
>
> Currently if when brokers join/leave the cluster without any partition states 
> changes, controller will send out UpdateMetadataRequests containing the 
> states of all partitions to all brokers. But for existing brokers in the 
> cluster, the metadata diff between controller and the broker should only be 
> the "live_brokers" info. Only the brokers with empty metadata cache need the 
> full UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all 
> brokers can place nonnegligible memory pressure on the controller side.
> Let's say in total we have N brokers, M partitions in the cluster and we want 
> to add 1 brand new broker in the cluster. With RF=2, the memory footprint per 
> partition in the UpdateMetadataRequest is ~200 Bytes. In the current 
> controller implementation, if each of the N RequestSendThreads serializes and 
> sends out the UpdateMetadataRequest at roughly the same time (which is very 
> likely the case), we will end up using *(N+1)*M*200B*. In a large kafka 
> cluster, we can have:
> {noformat}
> N=99
> M=100k
> Memory usage to send out UpdateMetadataRequest to all brokers:
> 100 * 100K * 200B = 2G
> However, we only need to send out full UpdateMetadataRequest to the newly 
> added broker. We only need to include live broker ids (4B * 100 brokers) in 
> the UpdateMetadataRequest sent to the existing 99 brokers. So the amount of 
> data that is actully needed will be:
> 1 * 100K * 200B + 99 * (100 * 4B) = ~21M
> We will can potentially reduce 2G / 21M = ~95x memory footprint as well as 
> the data tranferred in the network.{noformat}
>  
> This issue kind of hurts the scalability of a kafka cluster. KIP-380 and 
> KAFKA-7186 also help to further reduce the controller memory footprint.
>  
> In terms of implementation, we can keep some in-memory state in the 
> controller side to differentiate existing brokers and uninitialized brokers 
> (e.g. brand new brokers) so that if there is no change in partition states, 
> we only send out live brokers info to existing brokers.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677407#comment-16677407
 ] 

ASF GitHub Bot commented on KAFKA-7537:
---

junrao closed pull request #5869:  KAFKA-7537: Avoid sending full 
UpdateMetadataRequest to existing brokers in the cluster on broker changes to 
reduce controller memory footprint
URL: https://github.com/apache/kafka/pull/5869
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 85da8b8c0b2..a11f5535bda 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController, stateChangeLogge
   }
 }
 
-val givenPartitions = if (partitions.isEmpty)
-  controllerContext.partitionLeadershipInfo.keySet
-else
-  partitions
-
 updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
-givenPartitions.foreach(partition => 
updateMetadataRequestPartitionInfo(partition,
+partitions.foreach(partition => 
updateMetadataRequestPartitionInfo(partition,
   beingDeleted = 
controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 740ab7ff78c..a52f3f02363 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
 // they can process the LeaderAndIsrRequests that are generated by 
replicaStateMachine.startup() and
 // partitionStateMachine.startup().
 info("Sending update metadata request")
-
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set.empty)
 
 replicaStateMachine.startup()
 partitionStateMachine.startup()
@@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
 info(s"New broker startup callback for ${newBrokers.mkString(",")}")
 newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
 val newBrokersSet = newBrokers.toSet
-// send update metadata request to all live and shutting down brokers. Old 
brokers will get to know of the new
-// broker via this update.
+val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- 
newBrokers
+// Send update metadata request to all the existing brokers in the cluster 
so that they know about the new brokers
+// via this update. No need to include any partition states in the request 
since there are no partition state changes.
+sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
+// Send update metadata request to all the new brokers in the cluster with 
a full set of partition states for initialization.
 // In cases of controlled shutdown leaders will not be elected when a new 
broker comes up. So at least in the
-// common controlled shutdown case, the metadata will reach the new 
brokers faster
-
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+// common controlled shutdown case, the metadata will reach the new 
brokers faster.
+sendUpdateMetadataRequest(newBrokers, 
controllerContext.partitionLeadershipInfo.keySet)
 // the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
 // supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
 val allReplicasOnNewBrokers = 
controllerContext.replicasOnBrokers(newBrokersSet)
@@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
 
   private def onBrokerUpdate(updatedBrokerId: Int) {
 info(s"Broker info update callback for $updatedBrokerId")
-
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set.empty)
   }
 
   /**
@@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of 

[jira] [Created] (KAFKA-7602) Improve usage of @see tag in Streams JavaDocs

2018-11-06 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-7602:
--

 Summary: Improve usage of @see tag in Streams JavaDocs
 Key: KAFKA-7602
 URL: https://issues.apache.org/jira/browse/KAFKA-7602
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


As discussed on this PR 
[https://github.com/apache/kafka/pull/5273/files/bd8410ed3d5be9ca89e963687aa05e953d712b62..e4e3eed141447baf1c70ff15e2dc0df4e9a33f12#r223510489]
 we extensively use `@see` tags in Streams API Java docs.

This ticket is about revisiting all public JavaDocs (KStream, KTable, 
KGroupedStream, KGroupedTable, etc) and to define and document (in the wiki) a 
coherent strategy about the usage of `@see` tag, with the goal to guide users 
on how to use the API, and not too use `@see` too often to avoid confusion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677334#comment-16677334
 ] 

ASF GitHub Bot commented on KAFKA-7481:
---

lindong28 closed pull request #5857: KAFKA-7481; Add upgrade/downgrade notes 
for 2.1.x
URL: https://github.com/apache/kafka/pull/5857
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/upgrade.html b/docs/upgrade.html
index 41e2277bb24..33d9964113a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -20,6 +20,47 @@
 

[jira] [Reopened] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reopened KAFKA-7481:
-

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7481.
-
Resolution: Fixed

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-7481:
---

Assignee: Jason Gustafson

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-11-06 Thread Kamal Kang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677312#comment-16677312
 ] 

Kamal Kang commented on KAFKA-6812:
---

[~enether], sure, I can write KIP for this issue.

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR

2018-11-06 Thread Jaume M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677302#comment-16677302
 ] 

Jaume M commented on KAFKA-6522:


I believe this is happening to me because there were some kafa nodes with the 
same zookeeper cluster that I wasn't aware of. Which would make sense with what 
was said two comments before this one.

> Retrying leaderEpoch request for partition xxx as the leader reported an 
> error: UNKNOWN_SERVER_ERROR
> 
>
> Key: KAFKA-6522
> URL: https://issues.apache.org/jira/browse/KAFKA-6522
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.0.0
> Environment: Ubuntu 16.04 LTS 64bit-server
>Reporter: Wang Shuxiao
>Priority: Major
>
> we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
> fails to fetch data from leader:
> {code:java}
> [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
> the leader reported an error: UNKNOWN_SERVER_ERROR 
> (kafka.server.ReplicaFetcherThread)
> [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=3] Error when sending leader epoch request for 
> Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, 
> sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, 
> sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, 
> pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, 
> pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 
> -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, 
> pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, 
> __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, 
> sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
> -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, 
> sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 
> -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, 
> smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, 
> __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> 
> -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 401 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
>  at 
> kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312)
>  at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code}
>  
> on the leader(broker-401) side, the log shows:
> {code:java}
> [2018-02-02 08:58:26,859] ERROR Closing socket for 
> 192.168.100.101:9099-192.168.100.103:30476 because of error 
> (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: 23 and apiVersion: 0
> Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it 
> should be between `0` and `20` (inclusive)
>  at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
>  at 
> kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
>  at kafka.network.RequestChannel$Request.(RequestChannel.scala:91)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
>  at kafka.network.Processor.run(SocketServer.scala:417)
>  at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7313) StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677229#comment-16677229
 ] 

ASF GitHub Bot commented on KAFKA-7313:
---

lindong28 closed pull request #5533: KAFKA-7313; StopReplicaRequest should 
attempt to remove future replica for the partition only if future replica exists
URL: https://github.com/apache/kafka/pull/5533
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 819da2cfd42..745c89a393b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -352,7 +352,8 @@ class Partition(val topicPartition: TopicPartition,
   leaderEpochStartOffsetOpt = None
   removePartitionMetrics()
   logManager.asyncDelete(topicPartition)
-  logManager.asyncDelete(topicPartition, isFuture = true)
+  if (logManager.getLog(topicPartition, isFuture = true).isDefined)
+logManager.asyncDelete(topicPartition, isFuture = true)
 }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 39029b078d2..26bfbe9e0fe 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -845,7 +845,7 @@ class LogManager(logDirs: Seq[File],
   addLogToBeDeleted(removedLog)
   info(s"Log for partition ${removedLog.topicPartition} is renamed to 
${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
 } else if (offlineLogDirs.nonEmpty) {
-  throw new KafkaStorageException("Failed to delete log for " + 
topicPartition + " because it may be in one of the offline directories " + 
offlineLogDirs.mkString(","))
+  throw new KafkaStorageException(s"Failed to delete log for ${if 
(isFuture) "future" else ""} $topicPartition because it may be in one of the 
offline directories ${offlineLogDirs.mkString(",")}")
 }
 removedLog
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1146befdc8e..443a5cfd08b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -338,8 +338,10 @@ class ReplicaManager(val config: KafkaConfig,
 
 if (deletePartition) {
   val removedPartition = allPartitions.remove(topicPartition)
-  if (removedPartition eq ReplicaManager.OfflinePartition)
+  if (removedPartition eq ReplicaManager.OfflinePartition) {
+allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
 throw new KafkaStorageException(s"Partition $topicPartition is on an 
offline disk")
+  }
 
   if (removedPartition != null) {
 val topicHasPartitions = allPartitions.values.exists(partition => 
topicPartition.topic == partition.topic)
@@ -1402,7 +1404,8 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   // logDir should be an absolute path
-  def handleLogDirFailure(dir: String) {
+  // sendZkNotification is needed for unit test
+  def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true) {
 if (!logManager.isLogDirOnline(dir))
   return
 info(s"Stopping serving replicas in dir $dir")
@@ -1438,7 +1441,9 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions 
${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the 
failed log directory $dir.")
 }
 logManager.handleLogDirFailure(dir)
-zkClient.propagateLogDirEvent(localBrokerId)
+
+if (sendZkNotification)
+  zkClient.propagateLogDirEvent(localBrokerId)
 info(s"Stopped serving replicas in dir $dir")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
new file mode 100644
index 000..5df61ebe56e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
@@ -0,0 +1,57 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is 

[jira] [Updated] (KAFKA-7313) StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7313:

Fix Version/s: (was: 2.1.1)
   2.1.0
   2.0.1

> StopReplicaRequest should attempt to remove future replica for the partition 
> only if future replica exists
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> This patch fixes two issues:
> 1) Currently if a broker received StopReplicaRequest with delete=true for the 
> same offline replica, the first StopRelicaRequest will show 
> KafkaStorageException and the second StopRelicaRequest will show 
> ReplicaNotAvailableException. This is because the first StopRelicaRequest 
> will remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
> ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
> second StopRelicaRequest will not find this partition as offline.
> This result appears to be inconsistent. And since the replica is already 
> offline and broker will not be able to delete file for this replica, the 
> StopReplicaRequest should fail without making any change and broker should 
> still remember that this replica is offline. 
> 2) Currently if broker receives StopReplicaRequest with delete=true, the 
> broker will attempt to remove future replica for the partition, which will 
> cause KafkaStorageException in the StopReplicaResponse if this replica does 
> not have future replica. It is problematic to always return 
> KafkaStorageException in the response if future replica does not exist.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7559.
-
Resolution: Fixed

> ConnectStandaloneFileTest system tests do not pass
> --
>
> Key: KAFKA-7559
> URL: https://issues.apache.org/jira/browse/KAFKA-7559
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under 
> `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with 
> error messages similar to:
> "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in 
> condition mode: LISTEN"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7559:

Fix Version/s: 2.1.0
   2.0.1

> ConnectStandaloneFileTest system tests do not pass
> --
>
> Key: KAFKA-7559
> URL: https://issues.apache.org/jira/browse/KAFKA-7559
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under 
> `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with 
> error messages similar to:
> "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in 
> condition mode: LISTEN"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR

2018-11-06 Thread Jaume M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677217#comment-16677217
 ] 

Jaume M commented on KAFKA-6522:


Same happens to me by the end of this tutorial if I follow it without any 
deviation: [https://data-flair.training/blogs/kafka-cluster/] . It goes away 
when I restart the nodes.

> Retrying leaderEpoch request for partition xxx as the leader reported an 
> error: UNKNOWN_SERVER_ERROR
> 
>
> Key: KAFKA-6522
> URL: https://issues.apache.org/jira/browse/KAFKA-6522
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.0.0
> Environment: Ubuntu 16.04 LTS 64bit-server
>Reporter: Wang Shuxiao
>Priority: Major
>
> we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
> fails to fetch data from leader:
> {code:java}
> [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
> the leader reported an error: UNKNOWN_SERVER_ERROR 
> (kafka.server.ReplicaFetcherThread)
> [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=3] Error when sending leader epoch request for 
> Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, 
> sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, 
> sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, 
> pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, 
> pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 
> -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, 
> pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, 
> __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, 
> sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
> -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, 
> sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 
> -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, 
> smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, 
> __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> 
> -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 401 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
>  at 
> kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312)
>  at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code}
>  
> on the leader(broker-401) side, the log shows:
> {code:java}
> [2018-02-02 08:58:26,859] ERROR Closing socket for 
> 192.168.100.101:9099-192.168.100.103:30476 because of error 
> (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: 23 and apiVersion: 0
> Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it 
> should be between `0` and `20` (inclusive)
>  at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
>  at 
> kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
>  at kafka.network.RequestChannel$Request.(RequestChannel.scala:91)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
>  at kafka.network.Processor.run(SocketServer.scala:417)
>  at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR

2018-11-06 Thread Jaume M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677217#comment-16677217
 ] 

Jaume M edited comment on KAFKA-6522 at 11/6/18 7:33 PM:
-

Same happens to me by the end of this tutorial if I follow it without any 
deviation: [https://data-flair.training/blogs/kafka-cluster/] . It goes away 
when I restart the nodes. Using in my case \{{kafka_2.11-2.0.0.tgz}} for the 
three nodes.


was (Author: jmarhuen):
Same happens to me by the end of this tutorial if I follow it without any 
deviation: [https://data-flair.training/blogs/kafka-cluster/] . It goes away 
when I restart the nodes.

> Retrying leaderEpoch request for partition xxx as the leader reported an 
> error: UNKNOWN_SERVER_ERROR
> 
>
> Key: KAFKA-6522
> URL: https://issues.apache.org/jira/browse/KAFKA-6522
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.0.0
> Environment: Ubuntu 16.04 LTS 64bit-server
>Reporter: Wang Shuxiao
>Priority: Major
>
> we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
> fails to fetch data from leader:
> {code:java}
> [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
> the leader reported an error: UNKNOWN_SERVER_ERROR 
> (kafka.server.ReplicaFetcherThread)
> [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=3] Error when sending leader epoch request for 
> Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, 
> sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, 
> sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, 
> pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, 
> pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 
> -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, 
> pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, 
> __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, 
> sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
> -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, 
> sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 
> -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, 
> smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, 
> __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> 
> -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 401 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
>  at 
> kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312)
>  at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code}
>  
> on the leader(broker-401) side, the log shows:
> {code:java}
> [2018-02-02 08:58:26,859] ERROR Closing socket for 
> 192.168.100.101:9099-192.168.100.103:30476 because of error 
> (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: 23 and apiVersion: 0
> Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it 
> should be between `0` and `20` (inclusive)
>  at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
>  at 
> kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
>  at kafka.network.RequestChannel$Request.(RequestChannel.scala:91)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
>  at kafka.network.Processor.run(SocketServer.scala:417)
>  at java.lang.Thread.run(Thread.java:745){code}


[jira] [Commented] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677207#comment-16677207
 ] 

ASF GitHub Bot commented on KAFKA-7560:
---

lindong28 opened a new pull request #5886: KAFKA-7560; PushHttpMetricsReporter 
should not convert metric value t…
URL: https://github.com/apache/kafka/pull/5886
 
 
   @ijuma @omkreddy would you have time to review this patch? It seems to be 
the last issue for the 2.1 system test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> PushHttpMetricsReporter should not convert metric value to double
> -
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> Currently PushHttpMetricsReporter will convert value from 
> KafkaMetric.metricValue() to double. This will not work for non-numerical 
> metrics such as version in AppInfoParser whose value can be string. This has 
> caused issue for PushHttpMetricsReporter which in turn caused system test 
> kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the 
> following exception:  
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> Since we allow metric value to be object, PushHttpMetricsReporter should also 
> read metric value as object and pass it to the http server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7560:

Description: 
Currently PushHttpMetricsReporter will convert value from 
KafkaMetric.metricValue() to double. This will not work for non-numerical 
metrics such as version in AppInfoParser whose value can be string. This has 
caused issue for PushHttpMetricsReporter which in turn caused system test 
kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the 
following exception:  
{code:java}
 File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in 
validate     metric.value for k, metrics in 
producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
client_id=producer.client_id) for metric in metrics ValueError: max() arg is an 
empty sequence
{code}
Since we allow metric value to be object, PushHttpMetricsReporter should also 
read metric value as object and pass it to the http server.

  was:
Currently metricValue

 

The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
when I run it locally. It produces the following error message:
{code:java}
 File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in 
validate     metric.value for k, metrics in 
producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
client_id=producer.client_id) for metric in metrics ValueError: max() arg is an 
empty sequence
{code}
I assume it cannot find the metric it's searching for


> PushHttpMetricsReporter should not convert metric value to double
> -
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> Currently PushHttpMetricsReporter will convert value from 
> KafkaMetric.metricValue() to double. This will not work for non-numerical 
> metrics such as version in AppInfoParser whose value can be string. This has 
> caused issue for PushHttpMetricsReporter which in turn caused system test 
> kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the 
> following exception:  
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> Since we allow metric value to be object, PushHttpMetricsReporter should also 
> read metric value as object and pass it to the http server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-06 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677173#comment-16677173
 ] 

Matthias J. Sax commented on KAFKA-7595:


Just read [~vvcephei] comments. This is not a bug but expected behavior. We 
have an example in the wiki about how to compute average correctly: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?]

 

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-06 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7595.

Resolution: Not A Bug

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7560:

Description: 
Currently metricValue

 

The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
when I run it locally. It produces the following error message:
{code:java}
 File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in 
validate     metric.value for k, metrics in 
producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
client_id=producer.client_id) for metric in metrics ValueError: max() arg is an 
empty sequence
{code}
I assume it cannot find the metric it's searching for

  was:
The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
when I run it locally. It produces the following error message:


{code:java}
 File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in 
validate     metric.value for k, metrics in 
producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
client_id=producer.client_id) for metric in metrics ValueError: max() arg is an 
empty sequence
{code}
I assume it cannot find the metric it's searching for


> PushHttpMetricsReporter should not convert metric value to double
> -
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> Currently metricValue
>  
> The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
> when I run it locally. It produces the following error message:
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> I assume it cannot find the metric it's searching for



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7560:

Summary: PushHttpMetricsReporter should not convert metric value to double  
(was: Client Quota - system test failure)

> PushHttpMetricsReporter should not convert metric value to double
> -
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
> when I run it locally. It produces the following error message:
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> I assume it cannot find the metric it's searching for



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-06 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677148#comment-16677148
 ] 

Jason Gustafson commented on KAFKA-7531:


[~spuzon] Thanks for the extra info. Yeah, let us know if can reproduce it. 
Just looking at the trace, it's tough to tell what is causing the NPE. Which 
timeout are you referring to specifically? 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 

[jira] [Commented] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status

2018-11-06 Thread Colin P. McCabe (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677128#comment-16677128
 ] 

Colin P. McCabe commented on KAFKA-7599:


bq. I propose we allow for unbounded `targetMessagesPerSec` if the field is not 
present.

I guess the reason for having a limit by default is that it tends to give a 
better "out of the box" experience.  If you produce as fast as you can, you can 
often make the cluster less responsive for others, which can be annoying.  But 
I don't feel that strongly about it, I guess.

bq. Further, it would be very useful if some of these workers showed the 
`messagesPerSecond` they have been producing/consuming at. 

Yeah.  We may want a long-run and short-run average as well.

> Trogdor - Allow configuration for not throttling Benchmark Workers and expose 
> messages per second in task status
> 
>
> Key: KAFKA-7599
> URL: https://issues.apache.org/jira/browse/KAFKA-7599
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in 
> an argument called "targetMessagesPerSec". That argument works as an upper 
> bound on the number of messages that can be consumed/produced per second in 
> that worker.
> It is useful to support infinite messages per second. Currently, if the 
> `targetMessagesPerSec` field is not present in the request, the 
> RoundTripWorker will raise an exception, whereas the ConsumeBench and 
> ProduceBench workers will work as if they had `targetMessagesPerSec=10`.
> I propose we allow for unbounded `targetMessagesPerSec` if the field is not 
> present.
> Further, it would be very useful if some of these workers showed the 
> `messagesPerSecond` they have been producing/consuming at. 
> Even now, giving the worker a `targetMessagesPerSec` does not guarantee that 
> the worker will reach the needed `targetMessagesPerSec`. There is no easy way 
> of knowing how the worker performed - you have to subtract the status fields 
> `startedMs` and `doneMs` to get the total duration of the task, convert to 
> seconds and then divide that by the `maxMessages` field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7560) Client Quota - system test failure

2018-11-06 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677126#comment-16677126
 ] 

Dong Lin commented on KAFKA-7560:
-

Never mind. I just found the issue.

> Client Quota - system test failure
> --
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
> when I run it locally. It produces the following error message:
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> I assume it cannot find the metric it's searching for



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7601) Handle message format downgrades during upgrade of message format version

2018-11-06 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7601:
---
Description: 
During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are on the same binary version, this 
typically does not cause any problems. Followers will take whatever message 
format is used by the leader. However, it is possible for leadership to change 
several times between brokers which support the new format and those which 
support the old format. This can cause the version used in the log to flap 
between the different formats until the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
everything will generally be ok.

Downgrades of the message format are problematic, even if they are just 
temporary. There are basically two issues:

1. We use the configured message format version to tell whether down-conversion 
is needed. We assume that the this is always the maximum version used in the 
log, but that assumption fails in the case of a downgrade. In the worst case, 
old clients will see the new format and likely fail.

2. The logic we use for finding the truncation offset during the become 
follower transition does not handle flapping between message formats. When the 
new format is used by the leader, then the epoch cache will be updated 
correctly. When the old format is in use, the epoch cache won't be updated. 
This can lead to an incorrect result to OffsetsForLeaderEpoch queries.

We have actually observed the second problem. The scenario went something like 
this. Broker 1 is the leader of epoch 0 and writes some messages to the log 
using the v2 message format. Broker 2 then becomes the leader for epoch 1 and 
writes some messages in the v2 format. On broker 2, the last entry in the epoch 
cache is epoch 0. No entry is written for epoch 1 because it uses the old 
format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch query 
to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, the log 
end offset was returned. This resulted in localized log divergence.

There are a few options to fix this problem. From a high level, we can either 
be stricter about preventing downgrades of the message format, or we can add 
additional logic to make downgrades safe. 

(Disallow downgrades): As an example of the first approach, the leader could 
always use the maximum of the last version written to the log and the 
configured message format version. 

(Allow downgrades): If we want to allow downgrades, then it make makes sense to 
invalidate and remove all entries in the epoch cache following the message 
format downgrade. This would basically force us to revert to truncation to the 
high watermark, which is what you'd expect from a downgrade.  We would also 
need a solution for the problem of detecting when down-conversion is needed for 
a fetch request. One option I've been thinking about is enforcing the invariant 
that each segment uses only one message format version. Whenever the message 
format changes, we need to roll a new segment. Then we can simply remember 
which format is in use by each segment to tell whether down-conversion is 
needed for a given fetch request.


  was:
During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are on the same version, this typically does 
not cause any problems. Followers will take whatever message format is used by 
the leader. However, it is possible for leadership to change several times 
between brokers which support the new format and those which support the old 
format. This can cause the version used in the log to flap between the 
different formats until the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
everything will generally be ok.

Downgrades of the message 

[jira] [Updated] (KAFKA-7601) Handle message format downgrades during upgrade of message format version

2018-11-06 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7601:
---
Description: 
During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are on the same version, this typically does 
not cause any problems. Followers will take whatever message format is used by 
the leader. However, it is possible for leadership to change several times 
between brokers which support the new format and those which support the old 
format. This can cause the version used in the log to flap between the 
different formats until the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
everything will generally be ok.

Downgrades of the message format are problematic, even if they are just 
temporary. There are basically two issues:

1. We use the configured message format version to tell whether down-conversion 
is needed. We assume that the this is always the maximum version used in the 
log, but that assumption fails in the case of a downgrade. In the worst case, 
old clients will see the new format and likely fail.

2. The logic we use for finding the truncation offset during the become 
follower transition does not handle flapping between message formats. When the 
new format is used by the leader, then the epoch cache will be updated 
correctly. When the old format is in use, the epoch cache won't be updated. 
This can lead to an incorrect result to OffsetsForLeaderEpoch queries.

We have actually observed the second problem. The scenario went something like 
this. Broker 1 is the leader of epoch 0 and writes some messages to the log 
using the v2 message format. Broker 2 then becomes the leader for epoch 1 and 
writes some messages in the v2 format. On broker 2, the last entry in the epoch 
cache is epoch 0. No entry is written for epoch 1 because it uses the old 
format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch query 
to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, the log 
end offset was returned. This resulted in localized log divergence.

There are a few options to fix this problem. From a high level, we can either 
be stricter about preventing downgrades of the message format, or we can add 
additional logic to make downgrades safe. 

(Disallow downgrades): As an example of the first approach, the leader could 
always use the maximum of the last version written to the log and the 
configured message format version. 

(Allow downgrades): If we want to allow downgrades, then it make makes sense to 
invalidate and remove all entries in the epoch cache following the message 
format downgrade. This would basically force us to revert to truncation to the 
high watermark, which is what you'd expect from a downgrade.  We would also 
need a solution for the problem of detecting when down-conversion is needed for 
a fetch request. One option I've been thinking about is enforcing the invariant 
that each segment uses only one message format version. Whenever the message 
format changes, we need to roll a new segment. Then we can simply remember 
which format is in use by each segment to tell whether down-conversion is 
needed for a given fetch request.


  was:
During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are on the same version, this typically does 
not cause any problems. Followers will take whatever message format is used by 
the leader. However, it is possible for leadership to change several times 
between brokers which support the new format and those which support the old 
format. This can cause the version used in the log to flap between the 
different formats until the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
everything will generally be ok.

Downgrades of the message format 

[jira] [Created] (KAFKA-7601) Handle message format downgrades during upgrade of message format version

2018-11-06 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7601:
--

 Summary: Handle message format downgrades during upgrade of 
message format version
 Key: KAFKA-7601
 URL: https://issues.apache.org/jira/browse/KAFKA-7601
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are on the same version, this typically does 
not cause any problems. Followers will take whatever message format is used by 
the leader. However, it is possible for leadership to change several times 
between brokers which support the new format and those which support the old 
format. This can cause the version used in the log to flap between the 
different formats until the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
everything will generally be ok.

Downgrades of the message format are problematic, even if they are just 
temporary. There are basically two issues:

1. We use the configured message format version to tell whether down-conversion 
is needed. We assume that the this is always the maximum version used in the 
log, but that assumption fails in the case of a downgrade. In the worst case, 
old clients will see the new format and likely fail.

2. The logic we use for finding the truncation offset during the become 
follower transition does not handle flapping between message formats. When the 
new format is used by the leader, then the epoch cache will be updated 
correctly. When the old format is in use, the epoch cache won't be updated. 
This can lead to an incorrect result to OffsetsForLeaderEpoch queries.

For the second point, the specific case we observed is something like this. 
Broker 1 is the leader of epoch 0 and writes some messages to the log using the 
v2 message format. Broker 2 then becomes the leader for epoch 1 and writes some 
messages in the v2 format. On broker 2, the last entry in the epoch cache is 
epoch 0. No entry is written for epoch 1 because it uses the old format. When 
broker 1 became a follower, it send an OffsetsForLeaderEpoch query to broker 2 
for epoch 0. Since epoch 0 was the last entry in the cache, the log end offset 
was returned. This resulted in localized log divergence.

There are a few options to fix this problem. From a high level, we can either 
be stricter about preventing downgrades of the message format, or we can add 
additional logic to make downgrades safe. 

(Disallow downgrades): As an example of the first approach, the leader could 
always use the maximum of the last version written to the log and the 
configured message format version. 

(Allow downgrades): If we want to allow downgrades, then it make makes sense to 
invalidate and remove all entries in the epoch cache following the message 
format downgrade. We would also need a solution for the problem of detecting 
when down-conversion is needed for a fetch request. One option I've been 
thinking about is enforcing the invariant that each segment uses only one 
message format version. Whenever the message format changes, we need to roll a 
new segment. Then we can simply remember which format is in use by each segment 
to tell whether down-conversion is needed for a given fetch request.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6890) Add connector level configurability for producer/consumer client configs

2018-11-06 Thread Allen Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677049#comment-16677049
 ] 

Allen Tang commented on KAFKA-6890:
---

I really appreciate your input, Randall! 

The KIP to accompany this JIRA is here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-296%3A+Connector+level+configurability+for+client+configs
To respond to your concerns, I should also add that defining bootstrap.servers 
at the connector level and then subsequently changing the bootstrap.servers 
configuration at the worker level is actually something that we have performed 
in the past as it was the most graceful solution to what we were trying to 
achieve. We've had an occasion where we were tasked with provisioning an 
entirely new Kafka cluster, migrating all topics from the old cluster to the 
new cluster, and reconfiguring all producers and consumers that were 
interfacing with the old cluster to instead point to the new cluster -- Kafka 
Connect connectors included.

In the absence of a bootstrap.servers overriding capability on a 
connector-by-connector basis, we would only have the worker bootstrap.servers 
to go by, which meant we would have to face a highly coordinated 
inter-departmental effort with over forty connectors within the Kafka Connect 
cluster to account for, along with all of their downstream business-facing 
real-time implications after flipping the cluster-wide switch. 

By providing configurability of bootstrap.servers at the connector-level, 
connectors became decoupled from one another, and they were no longer required 
to read/write data where the internal topics live, allowing for customers to 
migrate their connectors over to the new Kafka cluster at their own pace and on 
their own schedule. Eventually, the three internal topics used for Kafka 
Connect were mirrored over to the new Kafka cluster and the cluster-wide 
boostrap.servers configuration change was applied, thereby fully decoupling 
Kafka Connect from the older Kafka cluster.

If blacklisting overrides for specific client configs, like bootstrap.servers, 
is something you feel strongly about, we may be able to achieve this via 
whitelisting of client configs, defined by administrators of Kafka Connect 
clusters, at the cluster-level within worker properties. Let me know your 
thoughts on this.

> Add connector level configurability for producer/consumer client configs
> 
>
> Key: KAFKA-6890
> URL: https://issues.apache.org/jira/browse/KAFKA-6890
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Allen Tang
>Priority: Minor
>
> Right now, each source connector and sink connector inherit their client 
> configurations from the worker properties. Within the worker properties, all 
> configurations that have a prefix of "producer." or "consumer." are applied 
> to all source connectors and sink connectors respectively.
> We should also provide connector-level overrides whereby connector properties 
> that are prefixed with "producer." and "consumer." are used to feed into the 
> producer and consumer clients embedded within source and sink connectors 
> respectively. The prefixes will be removed via a String#substring() call, and 
> the remainder of the connector property key will be used as the client 
> configuration key. The value is fed directly to the client as the 
> configuration value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676944#comment-16676944
 ] 

Randall Hauch commented on KAFKA-7593:
--

[~olkuznsmith], have you tried manually setting the {{max.message.bytes}} 
config to a larger value on the {{connect-configs}} topic in the broker? If so, 
then this is a good workaround in case other users run into this.

However, as you suggest we should evaluate how to respond when when attempts to 
write to the config topic fail. Logging might be easy (e.g., add a callback 
when the KafkaConfigBackingStore sends messages to the log), but it still might 
be difficult to prevent the loop.

It might also be good to set {{max.message.bytes}} to a larger value when 
creating the topic (no KIP required). Or, we could expose a new property for 
the internal topics on the distributed worker to allow users to set this 
(requires a KIP). IMO, you can always create this topic ahead of time, so maybe 
simply increasing the default {{max.message.bytes}} topic config is sufficient.

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException

2018-11-06 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676940#comment-16676940
 ] 

Rajini Sivaram commented on KAFKA-3980:
---

[~jintianfan] Do you have quotas enabled? If your broker is still running, you 
could attach jconsole and see if there are metrics which should have been 
expired. If not, we could take a look at the heap dump.

> JmxReporter uses excessive memory causing OutOfMemoryException
> --
>
> Key: KAFKA-3980
> URL: https://issues.apache.org/jira/browse/KAFKA-3980
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.1
>Reporter: Andrew Jorgensen
>Priority: Major
>
> I have some nodes in a kafka cluster that occasionally will run out of memory 
> whenever I restart the producers. I was able to take a heap dump from both a 
> recently restarted Kafka node which weighed in at about 20 MB and a node that 
> has been running for 2 months is using over 700MB of memory. Looking at the 
> heap dump it looks like the JmxReporter is holding on to metrics and causing 
> them to build up over time. 
> !http://imgur.com/N6Cd0Ku.png!
> !http://imgur.com/kQBqA2j.png!
> The ultimate problem this causes is that there is a chance when I restart the 
> producers it will cause the node to experience an Java heap space exception 
> and OOM. The nodes  then fail to startup correctly and write a -1 as the 
> leader number to the partitions they were responsible for effectively 
> resetting the offset and rendering that partition unavailable. The kafka 
> process then needs to go be restarted in order to re-assign the node to the 
> partition that it owns.
> I have a few questions:
> 1. I am not quite sure why there are so many client id entries in that 
> JmxReporter map.
> 2. Is there a way to have the JmxReporter release metrics after a set amount 
> of time or a way to turn certain high cardinality metrics like these off?
> I can provide any logs or heap dumps if more information is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-11-06 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676909#comment-16676909
 ] 

Rajini Sivaram commented on KAFKA-7280:
---

[~sachinu] The fix is available in 2.0.1 which is expected to be released very 
soon. The specific exception in this JIRA is in a code path that is used only 
when TRACE logging is enabled on the consumer. The fix in this JIRA makes the 
whole class thread-safe, but if you are seeing the exception without TRACE, can 
you include the stack trace, so that we can verify if the PR has fixed that 
too? Thanks.

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] 

[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676896#comment-16676896
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   …erId
   
   Check running in `Sender.maybeWaitForProducerId`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676895#comment-16676895
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2e7a0..6df77677f89 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long 
remainingTimeMs) throws IOException
 }
 
 private void maybeWaitForProducerId() {
-while (!transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
+while (running && !transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
 Node node = null;
 try {
 node = awaitLeastLoadedNodeReady(requestTimeoutMs);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7600) Provide capability to rename cluster id

2018-11-06 Thread Yeva Byzek (JIRA)
Yeva Byzek created KAFKA-7600:
-

 Summary: Provide capability to rename cluster id
 Key: KAFKA-7600
 URL: https://issues.apache.org/jira/browse/KAFKA-7600
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Yeva Byzek


Enhancement suggestion: ability to configure the cluster id that is displayed 
in ZK {{/cluster/id}} to be something more human readable like {{datacenter1}} 
instead of random characters like {{YLD3M3faTFG7ftEvoDGn5Q}}.

Value add: downstream clients that use the cluster id can present users with a 
more meaningful cluster identification

Other relevant links: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status

2018-11-06 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-7599:
---
Summary: Trogdor - Allow configuration for not throttling Benchmark Workers 
and expose messages per second in task status  (was: Trogdor - Allow 
configuration for not throttling Benchmark Workers and expose messages per 
second)

> Trogdor - Allow configuration for not throttling Benchmark Workers and expose 
> messages per second in task status
> 
>
> Key: KAFKA-7599
> URL: https://issues.apache.org/jira/browse/KAFKA-7599
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in 
> an argument called "targetMessagesPerSec". That argument works as an upper 
> bound on the number of messages that can be consumed/produced per second in 
> that worker.
> It is useful to support infinite messages per second. Currently, if the 
> `targetMessagesPerSec` field is not present in the request, the 
> RoundTripWorker will raise an exception, whereas the ConsumeBench and 
> ProduceBench workers will work as if they had `targetMessagesPerSec=10`.
> I propose we allow for unbounded `targetMessagesPerSec` if the field is not 
> present.
> Further, it would be very useful if some of these workers showed the 
> `messagesPerSecond` they have been producing/consuming at. 
> Even now, giving the worker a `targetMessagesPerSec` does not guarantee that 
> the worker will reach the needed `targetMessagesPerSec`. There is no easy way 
> of knowing how the worker performed - you have to subtract the status fields 
> `startedMs` and `doneMs` to get the total duration of the task, convert to 
> seconds and then divide that by the `maxMessages` field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second

2018-11-06 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7599:
--

 Summary: Trogdor - Allow configuration for not throttling 
Benchmark Workers and expose messages per second
 Key: KAFKA-7599
 URL: https://issues.apache.org/jira/browse/KAFKA-7599
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in an 
argument called "targetMessagesPerSec". That argument works as an upper bound 
on the number of messages that can be consumed/produced per second in that 
worker.

It is useful to support infinite messages per second. Currently, if the 
`targetMessagesPerSec` field is not present in the request, the RoundTripWorker 
will raise an exception, whereas the ConsumeBench and ProduceBench workers will 
work as if they had `targetMessagesPerSec=10`.

I propose we allow for unbounded `targetMessagesPerSec` if the field is not 
present.
Further, it would be very useful if some of these workers showed the 
`messagesPerSecond` they have been producing/consuming at. 
Even now, giving the worker a `targetMessagesPerSec` does not guarantee that 
the worker will reach the needed `targetMessagesPerSec`. There is no easy way 
of knowing how the worker performed - you have to subtract the status fields 
`startedMs` and `doneMs` to get the total duration of the task, convert to 
seconds and then divide that by the `maxMessages` field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676835#comment-16676835
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   …erId
   
   Check running in `Sender.maybeWaitForProducerId`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676834#comment-16676834
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2e7a0..6df77677f89 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long 
remainingTimeMs) throws IOException
 }
 
 private void maybeWaitForProducerId() {
-while (!transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
+while (running && !transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
 Node node = null;
 try {
 node = awaitLeastLoadedNodeReady(requestTimeoutMs);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7598) SIGSEGV on scala library Set

2018-11-06 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676816#comment-16676816
 ] 

Ismael Juma commented on KAFKA-7598:


This is a JVM bug and you should report it to the OpenJDK project if it hasn't 
been reported already.

> SIGSEGV on scala library Set
> 
>
> Key: KAFKA-7598
> URL: https://issues.apache.org/jira/browse/KAFKA-7598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: Docker CentOs image 7.3.1611 upgraded to 7.4.1708 
> (https://hub.docker.com/r/library/centos/tags/)
> java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64
>Reporter: Antoine Tran
>Priority: Major
>
> We had a crash, that appears randomly, with a SIGSEGV to the scala Set 
> library:
>  
> {color:#FF}[2018-09-24 20:52:04,568] INFO Updated PartitionLeaderEpoch. 
> New: \{epoch:0, offset:0}, Current: \{epoch:-1, offset-1} for Partition: 
> fac---MTI2RTM-rtmForecast2d-RCFD_2017100_0260-4. Cache now contains 0 
> entries. (kafka.server.epoch.LeaderEpochFileCache){color}
> {color:#FF} #{color}
> {color:#FF} # A fatal error has been detected by the Java Runtime 
> Environment:{color}
> {color:#FF} #{color}
> {color:#FF} #  SIGSEGV (0xb) at pc=0x7fdb3998c814, pid=1, 
> tid=0x7fd9a4588700{color}
> {color:#FF} #{color}
> {color:#FF} # JRE version: OpenJDK Runtime Environment (8.0_161-b14) 
> (build 1.8.0_161-b14){color}
> {color:#FF} # Java VM: OpenJDK 64-Bit Server VM (25.161-b14 mixed mode 
> linux-amd64 compressed oops){color}
> {color:#FF} # Problematic frame:{color}
> {color:#FF} # J 6249 C2 
> scala.collection.immutable.Set$EmptySet$.$minus(Ljava/lang/Object;)Lscala/collection/generic/Subtractable;
>  (6 bytes) @ 0x7fdb3998c814 [0x7fdb3998c7e0+0x34]{color}
> {color:#FF} #{color}
> {color:#FF} # Core dump written. Default location: //core or core.1{color}
> {color:#FF} #{color}
> {color:#FF} # An error report file with more information is saved 
> as:{color}
> {color:#FF} # //hs_err_pid1.log{color}
> {color:#FF} #{color}
> {color:#FF} # If you would like to submit a bug report, please 
> visit:{color}
> {color:#FF} #   [http://bugreport.java.com/bugreport/crash.jsp]{color}
> {color:#FF} #{color}
> [error occurred during error reporting , id 0xb]
> I couldn't have the core dump for now, I asked our team for it next time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676787#comment-16676787
 ] 

ASF GitHub Bot commented on KAFKA-7597:
---

stanislavkozlovski opened a new pull request #5885: KAFKA-7597: Make Trogdor 
ProduceBenchWorker support transactions
URL: https://github.com/apache/kafka/pull/5885
 
 
   It now accepts a new "messagesPerTransaction" field, which, if present, will 
enable transactional functionality in the bench worker.
   The producer will open N transactions with X messages each (bounded by the 
mandatory "maxMessages" field)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Support transactions in ProduceBenchWorker
> 
>
> Key: KAFKA-7597
> URL: https://issues.apache.org/jira/browse/KAFKA-7597
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka 
> Producer.
> It would prove useful if we supported transactions in this producer, as to 
> allow benchmarks with transactions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7598) SIGSEGV on scala library Set

2018-11-06 Thread Antoine Tran (JIRA)
Antoine Tran created KAFKA-7598:
---

 Summary: SIGSEGV on scala library Set
 Key: KAFKA-7598
 URL: https://issues.apache.org/jira/browse/KAFKA-7598
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.0
 Environment: Docker CentOs image 7.3.1611 upgraded to 7.4.1708 
(https://hub.docker.com/r/library/centos/tags/)
java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64
Reporter: Antoine Tran


We had a crash, that appears randomly, with a SIGSEGV to the scala Set library:

 

{color:#FF}[2018-09-24 20:52:04,568] INFO Updated PartitionLeaderEpoch. 
New: \{epoch:0, offset:0}, Current: \{epoch:-1, offset-1} for Partition: 
fac---MTI2RTM-rtmForecast2d-RCFD_2017100_0260-4. Cache now contains 0 
entries. (kafka.server.epoch.LeaderEpochFileCache){color}
{color:#FF} #{color}
{color:#FF} # A fatal error has been detected by the Java Runtime 
Environment:{color}
{color:#FF} #{color}
{color:#FF} #  SIGSEGV (0xb) at pc=0x7fdb3998c814, pid=1, 
tid=0x7fd9a4588700{color}
{color:#FF} #{color}
{color:#FF} # JRE version: OpenJDK Runtime Environment (8.0_161-b14) (build 
1.8.0_161-b14){color}
{color:#FF} # Java VM: OpenJDK 64-Bit Server VM (25.161-b14 mixed mode 
linux-amd64 compressed oops){color}
{color:#FF} # Problematic frame:{color}
{color:#FF} # J 6249 C2 
scala.collection.immutable.Set$EmptySet$.$minus(Ljava/lang/Object;)Lscala/collection/generic/Subtractable;
 (6 bytes) @ 0x7fdb3998c814 [0x7fdb3998c7e0+0x34]{color}
{color:#FF} #{color}
{color:#FF} # Core dump written. Default location: //core or core.1{color}
{color:#FF} #{color}
{color:#FF} # An error report file with more information is saved as:{color}
{color:#FF} # //hs_err_pid1.log{color}
{color:#FF} #{color}
{color:#FF} # If you would like to submit a bug report, please visit:{color}
{color:#FF} #   [http://bugreport.java.com/bugreport/crash.jsp]{color}
{color:#FF} #{color}

[error occurred during error reporting , id 0xb]

I couldn't have the core dump for now, I asked our team for it next time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-06 Thread Damian Guy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676728#comment-16676728
 ] 

Damian Guy commented on KAFKA-7595:
---

[~vvcephei] your reasoning seems valid to me

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException

2018-11-06 Thread Jin Tianfan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676724#comment-16676724
 ] 

Jin Tianfan commented on KAFKA-3980:


[~ijuma] [~rsivaram]

I meet the same problem.My kafka broker version kafka_2.11-0.11.0.0. we use 
jmap -histo:live pId,this is the output:  

--

num #instances #bytes class name
--
 1: 10468626 1241525032 [C
 2: 5186139 448500296 [Ljava.util.HashMap$Node;
 3: 10468427 251242248 java.lang.String
 4: 10389530 249348720 javax.management.ObjectName$Property
 5: 10372951 249121296 [Ljavax.management.ObjectName$Property;
 6: 5179185 248600880 java.util.HashMap
 7: 5186476 207459040 javax.management.ObjectName
 8: 5240552 167697664 java.util.HashMap$Node
 9: 6302 139607712 [B
 10: 5176173 124228152 org.apache.kafka.common.metrics.JmxReporter$KafkaMbean
 11: 5176210 82819360 java.util.HashMap$EntrySet
 12: 90003 2160072 java.util.concurrent.ConcurrentSkipListMap$Node
 13: 84784 2034816 java.lang.Double
 14: 45662 1461184 java.util.concurrent.ConcurrentHashMap$Node
 15: 25106 1244408 [Ljava.lang.Object;
 16: 43453 1042872 java.util.concurrent.ConcurrentSkipListMap$Index
 17: 18418 736720 java.util.LinkedHashMap$Entry

--

our jvm info as below:

/data/program/java/bin/java -Xmx4G -Xms4G -server -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 
-XX:+DisableExplicitGC -Djava.awt.headless=true 
-Xloggc:/data/program/kafka/kafka_2.11-0.11.0.0/bin/../logs/kafkaServer-gc.log 
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dkafka.logs.dir=/data/program/kafka/kafka_2.11-0.11.0.0/bin/../logs 
-Dlog4j.configuration=

 

we found too many metrics data in our borker memory.The broker has running 
healthly nearly one year.I review the code,only  ReplicationQuotaManager & 
ClientQuotaManager set sensor expire time 1hour.other sensors set sensor expire 
time Long.MAX_VALUE. Is this cause too many metrics in my heap? if you need i 
will send you my dump file.

I hope to receive your reply as soon as possible.

 

 

 

> JmxReporter uses excessive memory causing OutOfMemoryException
> --
>
> Key: KAFKA-3980
> URL: https://issues.apache.org/jira/browse/KAFKA-3980
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.1
>Reporter: Andrew Jorgensen
>Priority: Major
>
> I have some nodes in a kafka cluster that occasionally will run out of memory 
> whenever I restart the producers. I was able to take a heap dump from both a 
> recently restarted Kafka node which weighed in at about 20 MB and a node that 
> has been running for 2 months is using over 700MB of memory. Looking at the 
> heap dump it looks like the JmxReporter is holding on to metrics and causing 
> them to build up over time. 
> !http://imgur.com/N6Cd0Ku.png!
> !http://imgur.com/kQBqA2j.png!
> The ultimate problem this causes is that there is a chance when I restart the 
> producers it will cause the node to experience an Java heap space exception 
> and OOM. The nodes  then fail to startup correctly and write a -1 as the 
> leader number to the partitions they were responsible for effectively 
> resetting the offset and rendering that partition unavailable. The kafka 
> process then needs to go be restarted in order to re-assign the node to the 
> partition that it owns.
> I have a few questions:
> 1. I am not quite sure why there are so many client id entries in that 
> JmxReporter map.
> 2. Is there a way to have the JmxReporter release metrics after a set amount 
> of time or a way to turn certain high cardinality metrics like these off?
> I can provide any logs or heap dumps if more information is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676714#comment-16676714
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2e7a0..6df77677f89 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long 
remainingTimeMs) throws IOException
 }
 
 private void maybeWaitForProducerId() {
-while (!transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
+while (running && !transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
 Node node = null;
 try {
 node = awaitLeastLoadedNodeReady(requestTimeoutMs);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676715#comment-16676715
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   …erId
   
   Check running in `Sender.maybeWaitForProducerId`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker

2018-11-06 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7597:
--

 Summary: Trogdor - Support transactions in ProduceBenchWorker
 Key: KAFKA-7597
 URL: https://issues.apache.org/jira/browse/KAFKA-7597
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka Producer.
It would prove useful if we supported transactions in this producer, as to 
allow benchmarks with transactions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676621#comment-16676621
 ] 

ASF GitHub Bot commented on KAFKA-5503:
---

layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores 
shutdown while fetching Produc…
URL: https://github.com/apache/kafka/pull/5881
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2e7a0..6df77677f89 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long 
remainingTimeMs) throws IOException
 }
 
 private void maybeWaitForProducerId() {
-while (!transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
+while (running && !transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
 Node node = null;
 try {
 node = awaitLeastLoadedNodeReady(requestTimeoutMs);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7560) Client Quota - system test failure

2018-11-06 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676512#comment-16676512
 ] 

Dong Lin commented on KAFKA-7560:
-

Initially I thought the test failure is related to quota logic in this test and 
thus I would be one of the best person to debug this test. Now it seems that 
the test failed because the test suite is not able to read metrics from 
producer using the solution developed in 
[https://github.com/apache/kafka/pull/4072.] More specifically, the log message 
shows that 5 messages are successfully produced and consumed. But do_POST 
in http.py is never called and thus we have the exception shown in the Jira 
description. 

[~ewencp] [~apurva] could you have time to take a look since you are probably 
more familiar with the HTTP based approach of sending metrics here? I will also 
try to debug further.

> Client Quota - system test failure
> --
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
> when I run it locally. It produces the following error message:
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> I assume it cannot find the metric it's searching for



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7560) Client Quota - system test failure

2018-11-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7560:

Priority: Blocker  (was: Major)

> Client Quota - system test failure
> --
>
> Key: KAFKA-7560
> URL: https://issues.apache.org/jira/browse/KAFKA-7560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Stanislav Kozlovski
>Assignee: Dong Lin
>Priority: Blocker
>
> The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails 
> when I run it locally. It produces the following error message:
> {code:java}
>  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, 
> in validate     metric.value for k, metrics in 
> producer.metrics(group='producer-metrics', name='outgoing-byte-rate', 
> client_id=producer.client_id) for metric in metrics ValueError: max() arg is 
> an empty sequence
> {code}
> I assume it cannot find the metric it's searching for



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)