[jira] [Commented] (KAFKA-2818) Clean up Controller Object on forced Resignation

2016-02-26 Thread Matthew Bruce (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15169173#comment-15169173
 ] 

Matthew Bruce commented on KAFKA-2818:
--

[~fpj] You would definitely know that code better than me.  If you think that's 
the route to go then it sounds good to me.

> Clean up Controller Object on forced Resignation
> 
>
> Key: KAFKA-2818
> URL: https://issues.apache.org/jira/browse/KAFKA-2818
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Flavio Junqueira
>Priority: Minor
> Attachments: KAFKA-2818.patch
>
>
> Currently if the controller does a forced resignation (if an exception is 
> caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
> shutdownBroker), the Zookeeper resignation callback function 
> OnControllerResignation doesn't get a chance to execute which leaves some 
> artifacts laying around.  In particular the Sensors dont get cleaned up and 
> if the Kafka broker ever gets re-elected as Controller it will fail due to 
> some metrics already existing.  An Error and stack trace of such an event is 
> below.
> A forced resignation situation can be induced with a mis-config in 
> broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
> setting inter.broker.protocol.version=0.8.2.X
> {code}
> listeners=SASL_PLAINTEXT://:9092
> inter.broker.protocol.version=0.8.2.X
> security.inter.broker.protocol=SASL_PLAINTEXT
> {code}
> {code}
> [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
> broker 182050300 (kafka.server.ZookeeperLeaderElector)
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=controller-channel-metrics, 
> description=Connections closed per second in the window., 
> tags={broker-id=182050300}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578)
> at org.apache.kafka.common.network.Selector.(Selector.java:112)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2818) Clean up Controller Object on forced Resignation

2015-11-12 Thread Matthew Bruce (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Bruce updated KAFKA-2818:
-
Status: Patch Available  (was: Open)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7c03a24..a48ffb2 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -278,6 +278,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
   error("Forcing the controller to resign")
   brokerRequestBatch.clear()
   controllerElector.resign()
+  //Run the Resignation callback directly as it doesn't 
get called after the exception is propogated
+  onControllerResignation()
 
   throw e
 }
@@ -913,6 +915,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
 error("Forcing the controller to resign")
 brokerRequestBatch.clear()
 controllerElector.resign()
+   //Run the Resignation callback directly as it doesn't get called 
after the exception is propogated
+onControllerResignation()
 
 throw e
   }
@@ -1033,6 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
 error("Forcing the controller to resign")
 brokerRequestBatch.clear()
 controllerElector.resign()
+//Run the Resignation callback directly as it doesn't get called after 
the exception is propogated
+onControllerResignation()
 
 throw e
   }
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index c6f80ac..d4d1f50 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -106,7 +106,7 @@ class TopicDeletionManager(controller: KafkaController,
*/
   def shutdown() {
 // Only allow one shutdown to go through
-if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
+if (isDeleteTopicEnabled && deleteTopicsThread != null && 
deleteTopicsThread.initiateShutdown()) {
   // Resume the topic deletion so it doesn't block on the condition
   resumeTopicDeletionThread()
   // Await delete topic thread to exit


> Clean up Controller Object on forced Resignation
> 
>
> Key: KAFKA-2818
> URL: https://issues.apache.org/jira/browse/KAFKA-2818
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: KAFKA-2818.patch
>
>
> Currently if the controller does a forced resignation (if an exception is 
> caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
> shutdownBroker), the Zookeeper resignation callback function 
> OnControllerResignation doesn't get a chance to execute which leaves some 
> artifacts laying around.  In particular the Sensors dont get cleaned up and 
> if the Kafka broker ever gets re-elected as Controller it will fail due to 
> some metrics already existing.  An Error and stack trace of such an event is 
> below.
> A forced resignation situation can be induced with a mis-config in 
> broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
> setting inter.broker.protocol.version=0.8.2.X
> {code}
> listeners=SASL_PLAINTEXT://:9092
> inter.broker.protocol.version=0.8.2.X
> security.inter.broker.protocol=SASL_PLAINTEXT
> {code}
> {code}
> [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
> broker 182050300 (kafka.server.ZookeeperLeaderElector)
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=controller-channel-metrics, 
> description=Connections closed per second in the window., 
> tags={broker-id=182050300}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578)
> at org.apache.kafka.common.network.Selector.(Selector.java:112)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
> at 
> kafka.control

[jira] [Created] (KAFKA-2818) Clean up Controller Object on forced Resignation

2015-11-12 Thread Matthew Bruce (JIRA)
Matthew Bruce created KAFKA-2818:


 Summary: Clean up Controller Object on forced Resignation
 Key: KAFKA-2818
 URL: https://issues.apache.org/jira/browse/KAFKA-2818
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.9.0.0
Reporter: Matthew Bruce
Assignee: Neha Narkhede
Priority: Minor
 Attachments: KAFKA-2818.patch

Currently if the controller does a forced resignation (if an exception is 
caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
shutdownBroker), the Zookeeper resignation callback function 
OnControllerResignation doesn't get a chance to execute which leaves some 
artifacts laying around.  In particular the Sensors dont get cleaned up and if 
the Kafka broker ever gets re-elected as Controller it will fail due to some 
metrics already existing.  An Error and stack trace of such an event is below.

A forced resignation situation can be induced with a mis-config in 
broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
setting inter.broker.protocol.version=0.8.2.X

{code}
listeners=SASL_PLAINTEXT://:9092
inter.broker.protocol.version=0.8.2.X
security.inter.broker.protocol=SASL_PLAINTEXT
{code}



{code}
[2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
broker 182050300 (kafka.server.ZookeeperLeaderElector)
java.lang.IllegalArgumentException: A metric named 'MetricName 
[name=connection-close-rate, group=controller-channel-metrics, 
description=Connections closed per second in the window., 
tags={broker-id=182050300}]' already exists, can't register another one.
at 
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
at 
org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578)
at org.apache.kafka.common.network.Selector.(Selector.java:112)
at 
kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
at 
kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
at 
kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at 
kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43)
at 
kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819)
at 
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747)
at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330)
at 
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163)
at 
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2818) Clean up Controller Object on forced Resignation

2015-11-12 Thread Matthew Bruce (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Bruce updated KAFKA-2818:
-
Attachment: KAFKA-2818.patch

> Clean up Controller Object on forced Resignation
> 
>
> Key: KAFKA-2818
> URL: https://issues.apache.org/jira/browse/KAFKA-2818
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: KAFKA-2818.patch
>
>
> Currently if the controller does a forced resignation (if an exception is 
> caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
> shutdownBroker), the Zookeeper resignation callback function 
> OnControllerResignation doesn't get a chance to execute which leaves some 
> artifacts laying around.  In particular the Sensors dont get cleaned up and 
> if the Kafka broker ever gets re-elected as Controller it will fail due to 
> some metrics already existing.  An Error and stack trace of such an event is 
> below.
> A forced resignation situation can be induced with a mis-config in 
> broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
> setting inter.broker.protocol.version=0.8.2.X
> {code}
> listeners=SASL_PLAINTEXT://:9092
> inter.broker.protocol.version=0.8.2.X
> security.inter.broker.protocol=SASL_PLAINTEXT
> {code}
> {code}
> [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
> broker 182050300 (kafka.server.ZookeeperLeaderElector)
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=controller-channel-metrics, 
> description=Connections closed per second in the window., 
> tags={broker-id=182050300}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578)
> at org.apache.kafka.common.network.Selector.(Selector.java:112)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2756) Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses wrong protocol version

2015-11-05 Thread Matthew Bruce (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Bruce updated KAFKA-2756:
-
Attachment: KAFKA-2756.patch

> Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses 
> wrong protocol version
> -
>
> Key: KAFKA-2756
> URL: https://issues.apache.org/jira/browse/KAFKA-2756
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2756.patch
>
>
> During a rolling upgrade from 0.8.2.1 to 0.9.0.0, replication between 0.9.0.0 
> and 0.8.2.1 fails due to
> org.apache.kafka.clients.networkClient:handleCompletedReceives always using 
> the latest version of the API Key available instead of the one specified by 
> inter.broker.protocol.version.
> This line should not use ProtoUtils.currentResponseSchema and instead call 
> ProtoUtils.ResponseSchema and specify a version explicitly:
> {code}
> Struct body = (Struct) 
> ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
> {code}
> This results in WARN messages like the following in the server.log file as 
> the responses are decoded with the wrong Schema:
> {code}
> [2015-11-05 19:13:10,309] WARN [ReplicaFetcherThread-0-182050600], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6cc18858. Possible 
> cause: org.apache.kafka.common.protocol.types.SchemaException: Error reading 
> field 'responses': Error reading field 'topic': 
> java.nio.BufferUnderflowException (kafka.server.ReplicaFetcherThread)
> {code}
> {code}
> [2015-11-03 16:55:15,178] WARN [ReplicaFetcherThread-1-182050600], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@224388b2. Possible 
> cause: org.apache.kafka.common.protocol.types.SchemaException: Error reading 
> field 'responses': Error reading field 'partition_responses': Error reading 
> field 'record_set': java.lang.IllegalArgumentException 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2756) Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses wrong protocol version

2015-11-05 Thread Matthew Bruce (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Bruce updated KAFKA-2756:
-
Affects Version/s: 0.9.0.0
   Status: Patch Available  (was: Open)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 2c56751..a253e6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -459,7 +459,8 @@ public class NetworkClient implements KafkaClient {
 ClientRequest req = inFlightRequests.completeNext(source);
 ResponseHeader header = ResponseHeader.parse(receive.payload());
 short apiKey = req.request().header().apiKey();
-Struct body = (Struct) 
ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
+short apiVer = req.request().header().apiVersion();
+Struct body = (Struct) 
ProtoUtils.responseSchema(apiKey,apiVer).read(receive.payload());
 correlate(req.request().header(), header);
 if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
 responses.add(new ClientResponse(req, now, false, body));

> Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses 
> wrong protocol version
> -
>
> Key: KAFKA-2756
> URL: https://issues.apache.org/jira/browse/KAFKA-2756
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
>
> During a rolling upgrade from 0.8.2.1 to 0.9.0.0, replication between 0.9.0.0 
> and 0.8.2.1 fails due to
> org.apache.kafka.clients.networkClient:handleCompletedReceives always using 
> the latest version of the API Key available instead of the one specified by 
> inter.broker.protocol.version.
> This line should not use ProtoUtils.currentResponseSchema and instead call 
> ProtoUtils.ResponseSchema and specify a version explicitly:
> {code}
> Struct body = (Struct) 
> ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
> {code}
> This results in WARN messages like the following in the server.log file as 
> the responses are decoded with the wrong Schema:
> {code}
> [2015-11-05 19:13:10,309] WARN [ReplicaFetcherThread-0-182050600], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6cc18858. Possible 
> cause: org.apache.kafka.common.protocol.types.SchemaException: Error reading 
> field 'responses': Error reading field 'topic': 
> java.nio.BufferUnderflowException (kafka.server.ReplicaFetcherThread)
> {code}
> {code}
> [2015-11-03 16:55:15,178] WARN [ReplicaFetcherThread-1-182050600], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@224388b2. Possible 
> cause: org.apache.kafka.common.protocol.types.SchemaException: Error reading 
> field 'responses': Error reading field 'partition_responses': Error reading 
> field 'record_set': java.lang.IllegalArgumentException 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2756) Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses wrong protocol version

2015-11-05 Thread Matthew Bruce (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Bruce updated KAFKA-2756:
-
Summary: Replication Broken between Kafka 0.8.2.1 and 0.9 - 
NetworkClient.java uses wrong protocol version  (was: Replication Broken 
between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses wrong protocl version)

> Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses 
> wrong protocol version
> -
>
> Key: KAFKA-2756
> URL: https://issues.apache.org/jira/browse/KAFKA-2756
> Project: Kafka
>  Issue Type: Bug
>Reporter: Matthew Bruce
>
> During a rolling upgrade from 0.8.2.1 to 0.9.0.0, replication between 0.9.0.0 
> and 0.8.2.1 fails due to
> org.apache.kafka.clients.networkClient:handleCompletedReceives always using 
> the latest version of the API Key available instead of the one specified by 
> inter.broker.protocol.version.
> This line should not use ProtoUtils.currentResponseSchema and instead call 
> ProtoUtils.ResponseSchema and specify a version explicitly:
> {code}
> Struct body = (Struct) 
> ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
> {code}
> This results in WARN messages like the following in the server.log file as 
> the responses are decoded with the wrong Schema:
> {code}
> [2015-11-05 19:13:10,309] WARN [ReplicaFetcherThread-0-182050600], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6cc18858. Possible 
> cause: org.apache.kafka.common.protocol.types.SchemaException: Error reading 
> field 'responses': Error reading field 'topic': 
> java.nio.BufferUnderflowException (kafka.server.ReplicaFetcherThread)
> {code}
> {code}
> [2015-11-03 16:55:15,178] WARN [ReplicaFetcherThread-1-182050600], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@224388b2. Possible 
> cause: org.apache.kafka.common.protocol.types.SchemaException: Error reading 
> field 'responses': Error reading field 'partition_responses': Error reading 
> field 'record_set': java.lang.IllegalArgumentException 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2756) Replication Broken between Kafka 0.8.2.1 and 0.9 - NetworkClient.java uses wrong protocl version

2015-11-05 Thread Matthew Bruce (JIRA)
Matthew Bruce created KAFKA-2756:


 Summary: Replication Broken between Kafka 0.8.2.1 and 0.9 - 
NetworkClient.java uses wrong protocl version
 Key: KAFKA-2756
 URL: https://issues.apache.org/jira/browse/KAFKA-2756
 Project: Kafka
  Issue Type: Bug
Reporter: Matthew Bruce


During a rolling upgrade from 0.8.2.1 to 0.9.0.0, replication between 0.9.0.0 
and 0.8.2.1 fails due to
org.apache.kafka.clients.networkClient:handleCompletedReceives always using the 
latest version of the API Key available instead of the one specified by 
inter.broker.protocol.version.

This line should not use ProtoUtils.currentResponseSchema and instead call 
ProtoUtils.ResponseSchema and specify a version explicitly:
{code}
Struct body = (Struct) 
ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
{code}

This results in WARN messages like the following in the server.log file as the 
responses are decoded with the wrong Schema:
{code}
[2015-11-05 19:13:10,309] WARN [ReplicaFetcherThread-0-182050600], Error in 
fetch kafka.server.ReplicaFetcherThread$FetchRequest@6cc18858. Possible cause: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading field 'topic': java.nio.BufferUnderflowException 
(kafka.server.ReplicaFetcherThread)
{code}
{code}
[2015-11-03 16:55:15,178] WARN [ReplicaFetcherThread-1-182050600], Error in 
fetch kafka.server.ReplicaFetcherThread$FetchRequest@224388b2. Possible cause: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading field 'partition_responses': Error reading field 
'record_set': java.lang.IllegalArgumentException 
(kafka.server.ReplicaFetcherThread)
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)