kafka git commit: KAFKA-2300: Error in controller log when broker tries to rejoin cluster
Repository: kafka Updated Branches: refs/heads/trunk be633a713 - a62d63007 KAFKA-2300: Error in controller log when broker tries to rejoin cluster Author: flavio junqueira f...@apache.org Reviewers: Ismael Juma, Guozhang Wang Closes #102 from fpj/2300 and squashes the following commits: 7bd2edb [flavio junqueira] KAFKA-2300: Removed unnecessary s occurrences. aa6ec90 [flavio junqueira] KAFKA-2300: Wrapped all occurences of sendRequestToBrokers with try/catch and fixed string typo. f1261b1 [flavio junqueira] Fixed some style issues. 9b6390a [flavio junqueira] Updated package name and removed unnecessary imports. dbd1bf3 [flavio junqueira] KAFKA-2300: Error in controller log when broker tries to rejoin cluster Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62d6300 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62d6300 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62d6300 Branch: refs/heads/trunk Commit: a62d63007c89c0c6f7ad62fe4643f7adc7fbc661 Parents: be633a7 Author: flavio junqueira f...@apache.org Authored: Wed Aug 12 14:31:39 2015 -0700 Committer: Guozhang Wang wangg...@gmail.com Committed: Wed Aug 12 14:31:39 2015 -0700 -- .../controller/ControllerChannelManager.scala | 102 +- .../kafka/controller/KafkaController.scala | 52 -- .../controller/ControllerFailoverTest.scala | 187 +++ 3 files changed, 288 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/a62d6300/core/src/main/scala/kafka/controller/ControllerChannelManager.scala -- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 9f521fa..4396b6e 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse import collection.Set class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { - private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] + protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = [Channel manager on controller + config.brokerId + ]: @@ -100,7 +100,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } - private def startRequestSendThread(brokerId: Int) { + protected def startRequestSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread if(requestThread.getState == Thread.State.NEW) requestThread.start() @@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { -leaderAndIsrRequestMap.foreach { m = - val broker = m._1 - val partitionStateInfos = m._2.toMap - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b = leaderIds.contains(b.id)).map(b = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) - for (p - partitionStateInfos) { -val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) become-leader else become-follower -stateChangeLogger.trace((Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d + - for partition [%s,%d]).format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, - p._1._1, p._1._2)) +try { + leaderAndIsrRequestMap.foreach { m = +val broker = m._1 +val partitionStateInfos = m._2.toMap +val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet +val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b = leaderIds.contains(b.id)).map(b = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) +val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) +for (p - partitionStateInfos) { +
kafka git commit: KAFKA-2429: Add annotations to mark classes as stable/unstable
Repository: kafka Updated Branches: refs/heads/trunk 3902dc024 - 04b0d870b KAFKA-2429: Add annotations to mark classes as stable/unstable This also marks the consumer as unstable to show an example of using these annotations. Author: Ewen Cheslack-Postava m...@ewencp.org Reviewers: Gwen Shapira Closes #133 from ewencp/stability-annotations and squashes the following commits: 09c15c3 [Ewen Cheslack-Postava] KAFKA-2429: Add annotations to mark classes as stable/unstable Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04b0d870 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04b0d870 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04b0d870 Branch: refs/heads/trunk Commit: 04b0d870b263117e301584bfc00dd8e81486617a Parents: 3902dc0 Author: Ewen Cheslack-Postava m...@ewencp.org Authored: Wed Aug 12 14:57:42 2015 -0700 Committer: Gwen Shapira csh...@gmail.com Committed: Wed Aug 12 14:57:42 2015 -0700 -- .../apache/kafka/clients/consumer/Consumer.java | 2 + .../kafka/clients/consumer/KafkaConsumer.java | 2 + .../common/annotation/InterfaceStability.java | 48 3 files changed, 52 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/04b0d870/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 158e1ea..76834ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -21,11 +21,13 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.annotation.InterfaceStability; /** * @see KafkaConsumer * @see MockConsumer */ +@InterfaceStability.Unstable public interface ConsumerK, V extends Closeable { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/04b0d870/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index ed99e9b..be46b6c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -390,6 +391,7 @@ import static org.apache.kafka.common.utils.Utils.min; * commit. * */ +@InterfaceStability.Unstable public class KafkaConsumerK, V implements ConsumerK, V { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/04b0d870/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java b/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java new file mode 100644 index 000..0d38f56 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/annotation/InterfaceStability.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.annotation; + +import
[1/2] kafka git commit: KAFKA-1782: fix JUnit3 Misuse
Repository: kafka Updated Branches: refs/heads/trunk 04b0d870b - 78685dc16 http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala -- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 39a6852..f846698 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -25,7 +25,6 @@ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.junit.{After, Before, Test} -import org.scalatest.junit.JUnit3Suite import java.util.Properties import java.io.File @@ -33,9 +32,9 @@ import java.io.File import scala.util.Random import scala.collection._ -import junit.framework.Assert._ +import org.junit.Assert._ -class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { +class OffsetCommitTest extends ZooKeeperTestHarness { val random: Random = new Random() val group = test-group val retentionCheckInterval: Long = 100L http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala -- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index a3a03db..dead087 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -17,7 +17,7 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite +import org.junit.{After, Before} import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage @@ -25,11 +25,12 @@ import kafka.serializer.StringEncoder import kafka.utils.{TestUtils} import kafka.common._ -class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { +class ReplicaFetchTest extends ZooKeeperTestHarness { var brokers: Seq[KafkaServer] = null val topic1 = foo val topic2 = bar + @Before override def setUp() { super.setUp() brokers = createBrokerConfigs(2, zkConnect, false) @@ -37,6 +38,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { .map(config = TestUtils.createServer(config)) } + @After override def tearDown() { brokers.foreach(_.shutdown()) super.tearDown() http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala -- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 00d5933..3770cb4 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -27,12 +27,11 @@ import java.io.File import org.apache.kafka.common.protocol.Errors import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient -import org.scalatest.junit.JUnit3Suite import org.junit.Test import scala.collection.Map -class ReplicaManagerTest extends JUnit3Suite { +class ReplicaManagerTest { val topic = test-topic @@ -84,7 +83,7 @@ class ReplicaManagerTest extends JUnit3Suite { rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback) -rm.shutdown(false); +rm.shutdown(false) TestUtils.verifyNonDaemonThreadsStatus http://git-wip-us.apache.org/repos/asf/kafka/blob/78685dc1/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala -- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 12269cd..1185a6f 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -20,18 +20,18 @@ import java.util.Properties import kafka.zk.ZooKeeperTestHarness import kafka.utils.{TestUtils, CoreUtils} -import org.junit.Test -import org.scalatest.junit.JUnit3Suite -import junit.framework.Assert._ +import org.junit.{Before, Test} +import org.junit.Assert._ import java.io.File -class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { +class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { var props1: Properties = null var config1: KafkaConfig = null var props2: Properties = null var config2: KafkaConfig = null val brokerMetaPropsFile = meta.properties + @Before override def setUp() {
[2/2] kafka git commit: KAFKA-1782: fix JUnit3 Misuse
KAFKA-1782: fix JUnit3 Misuse Author: Ewen Cheslack-Postava m...@ewencp.org Reviewers: Ewen Cheslack-Postava, Guozhang Wang Closes #135 from ewencp/kafka-1782-junit3-misusage and squashes the following commits: 0ae6258 [Ewen Cheslack-Postava] KAFKA-1782: Junit3 Misusage Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78685dc1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78685dc1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78685dc1 Branch: refs/heads/trunk Commit: 78685dc162a867c0b8870faf5baa6a098953ddc8 Parents: 04b0d87 Author: Alexander Pakulov a.paku...@gmail.com Authored: Wed Aug 12 19:58:44 2015 -0700 Committer: Guozhang Wang wangg...@gmail.com Committed: Wed Aug 12 19:58:44 2015 -0700 -- .../kafka/api/ConsumerBounceTest.scala | 2 + .../integration/kafka/api/ConsumerTest.scala| 2 + .../kafka/api/IntegrationTestHarness.scala | 5 ++- .../kafka/api/ProducerBounceTest.scala | 4 +- .../kafka/api/ProducerCompressionTest.scala | 3 +- .../kafka/api/ProducerFailureHandlingTest.scala | 14 +++ .../kafka/api/ProducerSendTest.scala| 8 ++-- .../test/scala/unit/kafka/KafkaConfigTest.scala | 2 +- .../unit/kafka/admin/AddPartitionsTest.scala| 8 ++-- .../test/scala/unit/kafka/admin/AdminTest.scala | 3 +- .../unit/kafka/admin/ConfigCommandTest.scala| 7 +--- .../kafka/admin/DeleteConsumerGroupTest.scala | 3 +- .../unit/kafka/admin/DeleteTopicTest.scala | 3 +- .../unit/kafka/admin/TopicCommandTest.scala | 5 +-- .../scala/unit/kafka/api/ApiUtilsTest.scala | 2 +- .../api/RequestResponseSerializationTest.scala | 2 +- .../unit/kafka/cluster/BrokerEndPointTest.scala | 3 +- .../scala/unit/kafka/common/ConfigTest.scala| 2 +- .../scala/unit/kafka/common/TopicTest.scala | 2 +- .../kafka/consumer/ConsumerIteratorTest.scala | 9 ++--- .../kafka/consumer/PartitionAssignorTest.scala | 5 +-- .../unit/kafka/consumer/TopicFilterTest.scala | 2 +- .../ZookeeperConsumerConnectorTest.scala| 8 ++-- .../ConsumerCoordinatorResponseTest.scala | 6 +-- .../coordinator/ConsumerGroupMetadataTest.scala | 2 +- .../coordinator/CoordinatorMetadataTest.scala | 2 +- .../coordinator/PartitionAssignorTest.scala | 2 +- .../kafka/integration/AutoOffsetResetTest.scala | 9 +++-- .../unit/kafka/integration/FetcherTest.scala| 11 +++--- .../integration/KafkaServerTestHarness.scala| 12 +++--- .../kafka/integration/MinIsrConfigTest.scala| 3 +- .../kafka/integration/PrimitiveApiTest.scala| 5 +-- .../ProducerConsumerTestHarness.scala | 40 ++-- .../kafka/integration/RollingBounceTest.scala | 8 ++-- .../kafka/integration/TopicMetadataTest.scala | 10 +++-- .../integration/UncleanLeaderElectionTest.scala | 12 +++--- .../ZookeeperConsumerConnectorTest.scala| 6 +-- .../message/BaseMessageSetTestCases.scala | 2 +- .../message/ByteBufferMessageSetTest.scala | 2 +- .../test/scala/unit/kafka/log/CleanerTest.scala | 17 + .../unit/kafka/log/FileMessageSetTest.scala | 2 +- .../kafka/log/LogCleanerIntegrationTest.scala | 4 +- .../scala/unit/kafka/log/LogConfigTest.scala| 4 +- .../scala/unit/kafka/log/LogManagerTest.scala | 18 - .../scala/unit/kafka/log/LogSegmentTest.scala | 8 +--- .../src/test/scala/unit/kafka/log/LogTest.scala | 2 +- .../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +- .../scala/unit/kafka/log/OffsetMapTest.scala| 2 +- .../kafka/message/BaseMessageSetTestCases.scala | 2 +- .../message/ByteBufferMessageSetTest.scala | 2 +- .../kafka/message/MessageCompressionTest.scala | 2 +- .../scala/unit/kafka/message/MessageTest.scala | 2 +- .../unit/kafka/message/MessageWriterTest.scala | 2 +- .../unit/kafka/metrics/KafkaTimerTest.scala | 5 +-- .../scala/unit/kafka/metrics/MetricsTest.scala | 9 ++--- .../unit/kafka/network/SocketServerTest.scala | 27 +++-- .../unit/kafka/producer/AsyncProducerTest.scala | 15 ++-- .../unit/kafka/producer/ProducerTest.scala | 28 +++--- .../unit/kafka/producer/SyncProducerTest.scala | 5 +-- .../unit/kafka/server/AdvertiseBrokerTest.scala | 8 ++-- .../kafka/server/DelayedOperationTest.scala | 15 .../kafka/server/DynamicConfigChangeTest.scala | 3 +- .../server/HighwatermarkPersistenceTest.scala | 3 +- .../unit/kafka/server/ISRExpirationTest.scala | 12 +++--- .../unit/kafka/server/KafkaConfigTest.scala | 4 +- .../unit/kafka/server/LeaderElectionTest.scala | 8 ++-- .../scala/unit/kafka/server/LogOffsetTest.scala | 5 +-- .../unit/kafka/server/LogRecoveryTest.scala | 6 ++- .../unit/kafka/server/OffsetCommitTest.scala| 5 +-- .../unit/kafka/server/ReplicaFetchTest.scala| 6 ++-