kafka git commit: KAFKA-2198: kafka-topics.sh exits with 0 status on failures; patched by Manikumar Reddy reviewed by Gwen Shapira

2015-07-13 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk bdbb9672f - a7e0ac365


KAFKA-2198: kafka-topics.sh exits with 0 status on failures; patched by 
Manikumar Reddy reviewed by Gwen Shapira


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7e0ac36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7e0ac36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7e0ac36

Branch: refs/heads/trunk
Commit: a7e0ac3659c2b499124a866bc0b16b6b1b412376
Parents: bdbb967
Author: Manikumar Reddy manikumar.re...@gmail.com
Authored: Mon Jul 13 22:08:33 2015 -0700
Committer: Gwen Shapira csh...@gmail.com
Committed: Mon Jul 13 22:08:33 2015 -0700

--
 core/src/main/scala/kafka/admin/TopicCommand.scala | 17 +++--
 1 file changed, 11 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/a7e0ac36/core/src/main/scala/kafka/admin/TopicCommand.scala
--
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index a2ecb96..a90aa87 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils
 import kafka.coordinator.ConsumerCoordinator
 
 
-object TopicCommand {
+object TopicCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
 
@@ -48,7 +48,7 @@ object TopicCommand {
 opts.checkArgs()
 
 val zkClient = 
ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 3, 3)
-
+var exitCode = 0
 try {
   if(opts.options.has(opts.createOpt))
 createTopic(zkClient, opts)
@@ -62,11 +62,14 @@ object TopicCommand {
 deleteTopic(zkClient, opts)
 } catch {
   case e: Throwable =
-println(Error while executing topic command  + e.getMessage)
-println(Utils.stackTrace(e))
+println(Error while executing topic command :  + e.getMessage)
+error(Utils.stackTrace(e))
+exitCode = 1
 } finally {
   zkClient.close()
+  System.exit(exitCode)
 }
+
   }
 
   private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): 
Seq[String] = {
@@ -97,7 +100,8 @@ object TopicCommand {
   def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 val topics = getTopics(zkClient, opts)
 if (topics.length == 0) {
-  println(Topic %s does not 
exist.format(opts.options.valueOf(opts.topicOpt)))
+  throw new IllegalArgumentException(Topic %s does not exist on ZK path 
%s.format(opts.options.valueOf(opts.topicOpt),
+  opts.options.valueOf(opts.zkConnectOpt)))
 }
 topics.foreach { topic =
   val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
@@ -138,7 +142,8 @@ object TopicCommand {
   def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
 val topics = getTopics(zkClient, opts)
 if (topics.length == 0) {
-  println(Topic %s does not 
exist.format(opts.options.valueOf(opts.topicOpt)))
+  throw new IllegalArgumentException(Topic %s does not exist on ZK path 
%s.format(opts.options.valueOf(opts.topicOpt),
+  opts.options.valueOf(opts.zkConnectOpt)))
 }
 topics.foreach { topic =
   try {



kafka git commit: KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang

2015-07-13 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 4aba4bc1d - 69b451e28


KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen 
Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69b451e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69b451e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69b451e2

Branch: refs/heads/trunk
Commit: 69b451e28944deb162f7427105c3090f41c8797f
Parents: 4aba4bc
Author: Tim Brooks tbroo...@gmail.com
Authored: Mon Jul 13 13:13:02 2015 -0700
Committer: Guozhang Wang wangg...@gmail.com
Committed: Mon Jul 13 13:13:02 2015 -0700

--
 .../org/apache/kafka/clients/consumer/KafkaConsumer.java  | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/69b451e2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7aa0760..b4e8f7f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.utils.Utils.min;
@@ -395,6 +396,7 @@ public class KafkaConsumerK, V implements ConsumerK, V {
 private static final Logger log = 
LoggerFactory.getLogger(KafkaConsumer.class);
 private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
 private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+private static final long NO_CURRENT_THREAD = -1L;
 private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
 
 private final Coordinator coordinator;
@@ -417,7 +419,7 @@ public class KafkaConsumerK, V implements ConsumerK, V {
 
 // currentThread holds the threadId of the current thread accessing 
KafkaConsumer
 // and is used to prevent multi-threaded access
-private final AtomicReferenceLong currentThread = new 
AtomicReferenceLong();
+private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 // refcount is used to allow reentrant access by the thread who has 
acquired currentThread
 private final AtomicInteger refcount = new AtomicInteger(0);
 
@@ -1355,8 +1357,8 @@ public class KafkaConsumerK, V implements ConsumerK, 
V {
  */
 private void acquire() {
 ensureNotClosed();
-Long threadId = Thread.currentThread().getId();
-if (!threadId.equals(currentThread.get())  
!currentThread.compareAndSet(null, threadId))
+long threadId = Thread.currentThread().getId();
+if (threadId != currentThread.get()  
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
 throw new ConcurrentModificationException(KafkaConsumer is not 
safe for multi-threaded access);
 refcount.incrementAndGet();
 }
@@ -1366,6 +1368,6 @@ public class KafkaConsumerK, V implements ConsumerK, 
V {
  */
 private void release() {
 if (refcount.decrementAndGet() == 0)
-currentThread.set(null);
+currentThread.set(NO_CURRENT_THREAD);
 }
 }



kafka git commit: kafka-972; MetadataRequest returns stale list of brokers; patched by Ashish Singh; reviewed by Jun Rao

2015-07-13 Thread junrao
Repository: kafka
Updated Branches:
  refs/heads/trunk 69b451e28 - bdbb9672f


kafka-972; MetadataRequest returns stale list of brokers; patched by Ashish 
Singh; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdbb9672
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdbb9672
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdbb9672

Branch: refs/heads/trunk
Commit: bdbb9672f5e035fd00801037e2affe64811ec6ab
Parents: 69b451e
Author: Ashish Singh asi...@cloudera.com
Authored: Mon Jul 13 17:16:34 2015 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Mon Jul 13 17:16:34 2015 -0700

--
 .../kafka/controller/KafkaController.scala  | 21 +--
 .../kafka/integration/TopicMetadataTest.scala   | 66 ++--
 2 files changed, 77 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdbb9672/core/src/main/scala/kafka/controller/KafkaController.scala
--
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 20f1499..b4fc755 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -387,8 +387,9 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient, val brokerSt
   /**
* This callback is invoked by the replica state machine's broker change 
listener, with the list of newly started
* brokers as input. It does the following -
-   * 1. Triggers the OnlinePartition state change for all new/offline 
partitions
-   * 2. It checks whether there are reassigned replicas assigned to any newly 
started brokers.  If
+   * 1. Sends update metadata request to all live and shutting down brokers
+   * 2. Triggers the OnlinePartition state change for all new/offline 
partitions
+   * 3. It checks whether there are reassigned replicas assigned to any newly 
started brokers.  If
*so, it performs the reassignment logic for each topic/partition.
*
* Note that we don't need to refresh the leader/isr cache for all 
topic/partitions at this point for two reasons:
@@ -400,10 +401,11 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient, val brokerSt
   def onBrokerStartup(newBrokers: Seq[Int]) {
 info(New broker startup callback for %s.format(newBrokers.mkString(,)))
 val newBrokersSet = newBrokers.toSet
-// send update metadata request for all partitions to the newly restarted 
brokers. In cases of controlled shutdown
-// leaders will not be elected when a new broker comes up. So at least in 
the common controlled shutdown case, the
-// metadata will reach the new brokers faster
-sendUpdateMetadataRequest(newBrokers)
+// send update metadata request to all live and shutting down brokers. Old 
brokers will get to know of the new
+// broker via this update.
+// In cases of controlled shutdown leaders will not be elected when a new 
broker comes up. So at least in the
+// common controlled shutdown case, the metadata will reach the new 
brokers faster
+
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
 // the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
 // supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
 val allReplicasOnNewBrokers = 
controllerContext.replicasOnBrokers(newBrokersSet)
@@ -433,6 +435,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient, val brokerSt
* 1. Mark partitions with dead leaders as offline
* 2. Triggers the OnlinePartition state change for all new/offline 
partitions
* 3. Invokes the OfflineReplica state change on the input list of newly 
started brokers
+   * 4. If no partitions are effected then send UpdateMetadataRequest to live 
or shutting down brokers
*
* Note that we don't need to refresh the leader/isr cache for all 
topic/partitions at this point.  This is because
* the partition state machine will refresh our cache for us when performing 
leader election for all new/offline
@@ -464,6 +467,12 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient, val brokerSt
   // since topic deletion cannot be retried until at least one replica is 
in TopicDeletionStarted state
   deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
 }
+
+// If broker failure did not require leader re-election, inform brokers of 
failed broker
+// Note that during leader re-election, brokers update their metadata
+if (partitionsWithoutLeader.isEmpty) {
+