kafka git commit: KAFKA-2198: kafka-topics.sh exits with 0 status on failures; patched by Manikumar Reddy reviewed by Gwen Shapira
Repository: kafka Updated Branches: refs/heads/trunk bdbb9672f - a7e0ac365 KAFKA-2198: kafka-topics.sh exits with 0 status on failures; patched by Manikumar Reddy reviewed by Gwen Shapira Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7e0ac36 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7e0ac36 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7e0ac36 Branch: refs/heads/trunk Commit: a7e0ac3659c2b499124a866bc0b16b6b1b412376 Parents: bdbb967 Author: Manikumar Reddy manikumar.re...@gmail.com Authored: Mon Jul 13 22:08:33 2015 -0700 Committer: Gwen Shapira csh...@gmail.com Committed: Mon Jul 13 22:08:33 2015 -0700 -- core/src/main/scala/kafka/admin/TopicCommand.scala | 17 +++-- 1 file changed, 11 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/a7e0ac36/core/src/main/scala/kafka/admin/TopicCommand.scala -- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index a2ecb96..a90aa87 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils import kafka.coordinator.ConsumerCoordinator -object TopicCommand { +object TopicCommand extends Logging { def main(args: Array[String]): Unit = { @@ -48,7 +48,7 @@ object TopicCommand { opts.checkArgs() val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 3, 3) - +var exitCode = 0 try { if(opts.options.has(opts.createOpt)) createTopic(zkClient, opts) @@ -62,11 +62,14 @@ object TopicCommand { deleteTopic(zkClient, opts) } catch { case e: Throwable = -println(Error while executing topic command + e.getMessage) -println(Utils.stackTrace(e)) +println(Error while executing topic command : + e.getMessage) +error(Utils.stackTrace(e)) +exitCode = 1 } finally { zkClient.close() + System.exit(exitCode) } + } private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { @@ -97,7 +100,8 @@ object TopicCommand { def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) if (topics.length == 0) { - println(Topic %s does not exist.format(opts.options.valueOf(opts.topicOpt))) + throw new IllegalArgumentException(Topic %s does not exist on ZK path %s.format(opts.options.valueOf(opts.topicOpt), + opts.options.valueOf(opts.zkConnectOpt))) } topics.foreach { topic = val configs = AdminUtils.fetchTopicConfig(zkClient, topic) @@ -138,7 +142,8 @@ object TopicCommand { def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) if (topics.length == 0) { - println(Topic %s does not exist.format(opts.options.valueOf(opts.topicOpt))) + throw new IllegalArgumentException(Topic %s does not exist on ZK path %s.format(opts.options.valueOf(opts.topicOpt), + opts.options.valueOf(opts.zkConnectOpt))) } topics.foreach { topic = try {
kafka git commit: KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang
Repository: kafka Updated Branches: refs/heads/trunk 4aba4bc1d - 69b451e28 KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69b451e2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69b451e2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69b451e2 Branch: refs/heads/trunk Commit: 69b451e28944deb162f7427105c3090f41c8797f Parents: 4aba4bc Author: Tim Brooks tbroo...@gmail.com Authored: Mon Jul 13 13:13:02 2015 -0700 Committer: Guozhang Wang wangg...@gmail.com Committed: Mon Jul 13 13:13:02 2015 -0700 -- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/69b451e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 7aa0760..b4e8f7f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.kafka.common.utils.Utils.min; @@ -395,6 +396,7 @@ public class KafkaConsumerK, V implements ConsumerK, V { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; +private static final long NO_CURRENT_THREAD = -1L; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private final Coordinator coordinator; @@ -417,7 +419,7 @@ public class KafkaConsumerK, V implements ConsumerK, V { // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access -private final AtomicReferenceLong currentThread = new AtomicReferenceLong(); +private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); // refcount is used to allow reentrant access by the thread who has acquired currentThread private final AtomicInteger refcount = new AtomicInteger(0); @@ -1355,8 +1357,8 @@ public class KafkaConsumerK, V implements ConsumerK, V { */ private void acquire() { ensureNotClosed(); -Long threadId = Thread.currentThread().getId(); -if (!threadId.equals(currentThread.get()) !currentThread.compareAndSet(null, threadId)) +long threadId = Thread.currentThread().getId(); +if (threadId != currentThread.get() !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException(KafkaConsumer is not safe for multi-threaded access); refcount.incrementAndGet(); } @@ -1366,6 +1368,6 @@ public class KafkaConsumerK, V implements ConsumerK, V { */ private void release() { if (refcount.decrementAndGet() == 0) -currentThread.set(null); +currentThread.set(NO_CURRENT_THREAD); } }
kafka git commit: kafka-972; MetadataRequest returns stale list of brokers; patched by Ashish Singh; reviewed by Jun Rao
Repository: kafka Updated Branches: refs/heads/trunk 69b451e28 - bdbb9672f kafka-972; MetadataRequest returns stale list of brokers; patched by Ashish Singh; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdbb9672 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdbb9672 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdbb9672 Branch: refs/heads/trunk Commit: bdbb9672f5e035fd00801037e2affe64811ec6ab Parents: 69b451e Author: Ashish Singh asi...@cloudera.com Authored: Mon Jul 13 17:16:34 2015 -0700 Committer: Jun Rao jun...@gmail.com Committed: Mon Jul 13 17:16:34 2015 -0700 -- .../kafka/controller/KafkaController.scala | 21 +-- .../kafka/integration/TopicMetadataTest.scala | 66 ++-- 2 files changed, 77 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/bdbb9672/core/src/main/scala/kafka/controller/KafkaController.scala -- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 20f1499..b4fc755 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -387,8 +387,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - - * 1. Triggers the OnlinePartition state change for all new/offline partitions - * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If + * 1. Sends update metadata request to all live and shutting down brokers + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If *so, it performs the reassignment logic for each topic/partition. * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: @@ -400,10 +401,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt def onBrokerStartup(newBrokers: Seq[Int]) { info(New broker startup callback for %s.format(newBrokers.mkString(,))) val newBrokersSet = newBrokers.toSet -// send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown -// leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the -// metadata will reach the new brokers faster -sendUpdateMetadataRequest(newBrokers) +// send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new +// broker via this update. +// In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the +// common controlled shutdown case, the metadata will reach the new brokers faster + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) @@ -433,6 +435,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * 1. Mark partitions with dead leaders as offline * 2. Triggers the OnlinePartition state change for all new/offline partitions * 3. Invokes the OfflineReplica state change on the input list of newly started brokers + * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because * the partition state machine will refresh our cache for us when performing leader election for all new/offline @@ -464,6 +467,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted) } + +// If broker failure did not require leader re-election, inform brokers of failed broker +// Note that during leader re-election, brokers update their metadata +if (partitionsWithoutLeader.isEmpty) { +