kafka git commit: KAFKA-2300: Error in controller log when broker tries to rejoin cluster

2015-08-12 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk be633a713 - a62d63007


KAFKA-2300: Error in controller log when broker tries to rejoin cluster

Author: flavio junqueira f...@apache.org

Reviewers: Ismael Juma, Guozhang Wang

Closes #102 from fpj/2300 and squashes the following commits:

7bd2edb [flavio junqueira] KAFKA-2300: Removed unnecessary s occurrences.
aa6ec90 [flavio junqueira] KAFKA-2300: Wrapped all occurences of 
sendRequestToBrokers with try/catch and fixed string typo.
f1261b1 [flavio junqueira] Fixed some style issues.
9b6390a [flavio junqueira] Updated package name and removed unnecessary imports.
dbd1bf3 [flavio junqueira] KAFKA-2300: Error in controller log when broker 
tries to rejoin cluster


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62d6300
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62d6300
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62d6300

Branch: refs/heads/trunk
Commit: a62d63007c89c0c6f7ad62fe4643f7adc7fbc661
Parents: be633a7
Author: flavio junqueira f...@apache.org
Authored: Wed Aug 12 14:31:39 2015 -0700
Committer: Guozhang Wang wangg...@gmail.com
Committed: Wed Aug 12 14:31:39 2015 -0700

--
 .../controller/ControllerChannelManager.scala   | 102 +-
 .../kafka/controller/KafkaController.scala  |  52 --
 .../controller/ControllerFailoverTest.scala | 187 +++
 3 files changed, 288 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/a62d6300/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
--
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 9f521fa..4396b6e 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse
 import collection.Set
 
 class ControllerChannelManager (private val controllerContext: 
ControllerContext, config: KafkaConfig) extends Logging {
-  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
+  protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = [Channel manager on controller  + config.brokerId + ]: 
 
@@ -100,7 +100,7 @@ class ControllerChannelManager (private val 
controllerContext: ControllerContext
 }
   }
 
-  private def startRequestSendThread(brokerId: Int) {
+  protected def startRequestSendThread(brokerId: Int) {
 val requestThread = brokerStateInfo(brokerId).requestSendThread
 if(requestThread.getState == Thread.State.NEW)
   requestThread.start()
@@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
-leaderAndIsrRequestMap.foreach { m =
-  val broker = m._1
-  val partitionStateInfos = m._2.toMap
-  val leaderIds = 
partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-  val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b = 
leaderIds.contains(b.id)).map(b = 
b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
-  val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, 
leaders, controllerId, controllerEpoch, correlationId, clientId)
-  for (p - partitionStateInfos) {
-val typeOfRequest = if (broker == 
p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) become-leader else 
become-follower
-stateChangeLogger.trace((Controller %d epoch %d sending %s 
LeaderAndIsr request %s with correlationId %d to broker %d  +
- for partition [%s,%d]).format(controllerId, 
controllerEpoch, typeOfRequest,
- 
p._2.leaderIsrAndControllerEpoch, correlationId, broker,
- p._1._1, 
p._1._2))
+try {
+  leaderAndIsrRequestMap.foreach { m =
+val broker = m._1
+val partitionStateInfos = m._2.toMap
+val leaderIds = 
partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
+val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b = 
leaderIds.contains(b.id)).map(b = 
b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
+val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, 
leaders, controllerId, controllerEpoch, correlationId, clientId)
+for (p - partitionStateInfos) {
+  

kafka git commit: KAFKA-2429: Add annotations to mark classes as stable/unstable

2015-08-12 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 3902dc024 - 04b0d870b


KAFKA-2429: Add annotations to mark classes as stable/unstable

This also marks the consumer as unstable to show an example of using these 
annotations.

Author: Ewen Cheslack-Postava m...@ewencp.org

Reviewers: Gwen Shapira

Closes #133 from ewencp/stability-annotations and squashes the following 
commits:

09c15c3 [Ewen Cheslack-Postava] KAFKA-2429: Add annotations to mark classes as 
stable/unstable


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04b0d870
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04b0d870
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04b0d870

Branch: refs/heads/trunk
Commit: 04b0d870b263117e301584bfc00dd8e81486617a
Parents: 3902dc0
Author: Ewen Cheslack-Postava m...@ewencp.org
Authored: Wed Aug 12 14:57:42 2015 -0700
Committer: Gwen Shapira csh...@gmail.com
Committed: Wed Aug 12 14:57:42 2015 -0700

--
 .../apache/kafka/clients/consumer/Consumer.java |  2 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +
 .../common/annotation/InterfaceStability.java   | 48 
 3 files changed, 52 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/04b0d870/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 158e1ea..76834ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -21,11 +21,13 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
  * @see KafkaConsumer
  * @see MockConsumer
  */
+@InterfaceStability.Unstable
 public interface ConsumerK, V extends Closeable {
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/04b0d870/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ed99e9b..be46b6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -390,6 +391,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  * commit.
  * 
  */
+@InterfaceStability.Unstable
 public class KafkaConsumerK, V implements ConsumerK, V {
 
 private static final Logger log = 
LoggerFactory.getLogger(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/04b0d870/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java
 
b/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java
new file mode 100644
index 000..0d38f56
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the License); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.annotation;
+
+import 

[1/2] kafka git commit: KAFKA-1782: fix JUnit3 Misuse

2015-08-12 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 04b0d870b - 78685dc16


http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 39a6852..f846698 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -25,7 +25,6 @@ import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 
 import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnit3Suite
 
 import java.util.Properties
 import java.io.File
@@ -33,9 +32,9 @@ import java.io.File
 import scala.util.Random
 import scala.collection._
 
-import junit.framework.Assert._
+import org.junit.Assert._
 
-class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
+class OffsetCommitTest extends ZooKeeperTestHarness {
   val random: Random = new Random()
   val group = test-group
   val retentionCheckInterval: Long = 100L

http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index a3a03db..dead087 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
+import org.junit.{After, Before}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import kafka.producer.KeyedMessage
@@ -25,11 +25,12 @@ import kafka.serializer.StringEncoder
 import kafka.utils.{TestUtils}
 import kafka.common._
 
-class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
+class ReplicaFetchTest extends ZooKeeperTestHarness  {
   var brokers: Seq[KafkaServer] = null
   val topic1 = foo
   val topic2 = bar
 
+  @Before
   override def setUp() {
 super.setUp()
 brokers = createBrokerConfigs(2, zkConnect, false)
@@ -37,6 +38,7 @@ class ReplicaFetchTest extends JUnit3Suite with 
ZooKeeperTestHarness  {
   .map(config = TestUtils.createServer(config))
   }
 
+  @After
   override def tearDown() {
 brokers.foreach(_.shutdown())
 super.tearDown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 00d5933..3770cb4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -27,12 +27,11 @@ import java.io.File
 import org.apache.kafka.common.protocol.Errors
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
-import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 
 import scala.collection.Map
 
-class ReplicaManagerTest extends JUnit3Suite {
+class ReplicaManagerTest {
 
   val topic = test-topic
 
@@ -84,7 +83,7 @@ class ReplicaManagerTest extends JUnit3Suite {
 
 rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = 
false, messagesPerPartition = produceRequest.data, responseCallback = callback)
 
-rm.shutdown(false);
+rm.shutdown(false)
 
 TestUtils.verifyNonDaemonThreadsStatus
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
--
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 12269cd..1185a6f 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -20,18 +20,18 @@ import java.util.Properties
 
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{TestUtils, CoreUtils}
-import org.junit.Test
-import org.scalatest.junit.JUnit3Suite
-import junit.framework.Assert._
+import org.junit.{Before, Test}
+import org.junit.Assert._
 import java.io.File
 
-class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness 
{
+class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
   var props1: Properties = null
   var config1: KafkaConfig = null
   var props2: Properties = null
   var config2: KafkaConfig = null
   val brokerMetaPropsFile = meta.properties
 
+  @Before
   override def setUp() {
 

[2/2] kafka git commit: KAFKA-1782: fix JUnit3 Misuse

2015-08-12 Thread guozhang
KAFKA-1782: fix JUnit3 Misuse

Author: Ewen Cheslack-Postava m...@ewencp.org

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #135 from ewencp/kafka-1782-junit3-misusage and squashes the following 
commits:

0ae6258 [Ewen Cheslack-Postava] KAFKA-1782: Junit3 Misusage


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78685dc1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78685dc1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78685dc1

Branch: refs/heads/trunk
Commit: 78685dc162a867c0b8870faf5baa6a098953ddc8
Parents: 04b0d87
Author: Alexander Pakulov a.paku...@gmail.com
Authored: Wed Aug 12 19:58:44 2015 -0700
Committer: Guozhang Wang wangg...@gmail.com
Committed: Wed Aug 12 19:58:44 2015 -0700

--
 .../kafka/api/ConsumerBounceTest.scala  |  2 +
 .../integration/kafka/api/ConsumerTest.scala|  2 +
 .../kafka/api/IntegrationTestHarness.scala  |  5 ++-
 .../kafka/api/ProducerBounceTest.scala  |  4 +-
 .../kafka/api/ProducerCompressionTest.scala |  3 +-
 .../kafka/api/ProducerFailureHandlingTest.scala | 14 +++
 .../kafka/api/ProducerSendTest.scala|  8 ++--
 .../test/scala/unit/kafka/KafkaConfigTest.scala |  2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala|  8 ++--
 .../test/scala/unit/kafka/admin/AdminTest.scala |  3 +-
 .../unit/kafka/admin/ConfigCommandTest.scala|  7 +---
 .../kafka/admin/DeleteConsumerGroupTest.scala   |  3 +-
 .../unit/kafka/admin/DeleteTopicTest.scala  |  3 +-
 .../unit/kafka/admin/TopicCommandTest.scala |  5 +--
 .../scala/unit/kafka/api/ApiUtilsTest.scala |  2 +-
 .../api/RequestResponseSerializationTest.scala  |  2 +-
 .../unit/kafka/cluster/BrokerEndPointTest.scala |  3 +-
 .../scala/unit/kafka/common/ConfigTest.scala|  2 +-
 .../scala/unit/kafka/common/TopicTest.scala |  2 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |  9 ++---
 .../kafka/consumer/PartitionAssignorTest.scala  |  5 +--
 .../unit/kafka/consumer/TopicFilterTest.scala   |  2 +-
 .../ZookeeperConsumerConnectorTest.scala|  8 ++--
 .../ConsumerCoordinatorResponseTest.scala   |  6 +--
 .../coordinator/ConsumerGroupMetadataTest.scala |  2 +-
 .../coordinator/CoordinatorMetadataTest.scala   |  2 +-
 .../coordinator/PartitionAssignorTest.scala |  2 +-
 .../kafka/integration/AutoOffsetResetTest.scala |  9 +++--
 .../unit/kafka/integration/FetcherTest.scala| 11 +++---
 .../integration/KafkaServerTestHarness.scala| 12 +++---
 .../kafka/integration/MinIsrConfigTest.scala|  3 +-
 .../kafka/integration/PrimitiveApiTest.scala|  5 +--
 .../ProducerConsumerTestHarness.scala   | 40 ++--
 .../kafka/integration/RollingBounceTest.scala   |  8 ++--
 .../kafka/integration/TopicMetadataTest.scala   | 10 +++--
 .../integration/UncleanLeaderElectionTest.scala | 12 +++---
 .../ZookeeperConsumerConnectorTest.scala|  6 +--
 .../message/BaseMessageSetTestCases.scala   |  2 +-
 .../message/ByteBufferMessageSetTest.scala  |  2 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala | 17 +
 .../unit/kafka/log/FileMessageSetTest.scala |  2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  4 +-
 .../scala/unit/kafka/log/LogConfigTest.scala|  4 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   | 18 -
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  8 +---
 .../src/test/scala/unit/kafka/log/LogTest.scala |  2 +-
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  2 +-
 .../scala/unit/kafka/log/OffsetMapTest.scala|  2 +-
 .../kafka/message/BaseMessageSetTestCases.scala |  2 +-
 .../message/ByteBufferMessageSetTest.scala  |  2 +-
 .../kafka/message/MessageCompressionTest.scala  |  2 +-
 .../scala/unit/kafka/message/MessageTest.scala  |  2 +-
 .../unit/kafka/message/MessageWriterTest.scala  |  2 +-
 .../unit/kafka/metrics/KafkaTimerTest.scala |  5 +--
 .../scala/unit/kafka/metrics/MetricsTest.scala  |  9 ++---
 .../unit/kafka/network/SocketServerTest.scala   | 27 +++--
 .../unit/kafka/producer/AsyncProducerTest.scala | 15 ++--
 .../unit/kafka/producer/ProducerTest.scala  | 28 +++---
 .../unit/kafka/producer/SyncProducerTest.scala  |  5 +--
 .../unit/kafka/server/AdvertiseBrokerTest.scala |  8 ++--
 .../kafka/server/DelayedOperationTest.scala | 15 
 .../kafka/server/DynamicConfigChangeTest.scala  |  3 +-
 .../server/HighwatermarkPersistenceTest.scala   |  3 +-
 .../unit/kafka/server/ISRExpirationTest.scala   | 12 +++---
 .../unit/kafka/server/KafkaConfigTest.scala |  4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  8 ++--
 .../scala/unit/kafka/server/LogOffsetTest.scala |  5 +--
 .../unit/kafka/server/LogRecoveryTest.scala |  6 ++-
 .../unit/kafka/server/OffsetCommitTest.scala|  5 +--
 .../unit/kafka/server/ReplicaFetchTest.scala|  6 ++-