kafka git commit: KAFKA-4438; Cross compile to Scala 2.12.0

2016-11-30 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 aec74ae5b -> 9ffadbfdc


KAFKA-4438; Cross compile to Scala 2.12.0

(cherry picked from commit f3aad3b54b7cbc5109d8398829a31100fd82b3e0)

Author: Bernard Leach 

Reviewers: Guozhang Wang 

Closes #2164 from leachbj/0.10.1-2.12-backport


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ffadbfd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ffadbfd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ffadbfd

Branch: refs/heads/0.10.1
Commit: 9ffadbfdc6f1b1fc47d48cbb7edf0f2f3a5f8f5e
Parents: aec74ae
Author: Bernard Leach 
Authored: Wed Nov 30 22:28:08 2016 -0800
Committer: Guozhang Wang 
Committed: Wed Nov 30 22:28:08 2016 -0800

--
 build.gradle   | 4 ++--
 core/src/main/scala/kafka/utils/IteratorTemplate.scala | 2 +-
 gradle/dependencies.gradle | 7 +--
 3 files changed, 8 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ffadbfd/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 5e0e234..fded48e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -313,7 +313,7 @@ task jacocoRootReport(type: 
org.gradle.testing.jacoco.tasks.JacocoReport) {
 
 task reportCoverage(dependsOn: ['jacocoRootReport', 'core:reportScoverage'])
 
-for ( sv in ['2_10', '2_11'] ) {
+for ( sv in ['2_10', '2_11', '2_12'] ) {
   String svInDot = sv.replaceAll( "_", ".")
 
   tasks.create(name: "jar_core_${sv}", type: GradleBuild) {
@@ -505,7 +505,7 @@ project(':core') {
 
   task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 
'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 
'genConsumerConfigDocs', 'genKafkaConfigDocs',
-   'genTopicConfigDocs', 
':connect:runtime:genConnectConfigDocs', 
+   'genTopicConfigDocs', 
':connect:runtime:genConnectConfigDocs',
':streams:genStreamsConfigDocs'], type: Tar) {
 classifier = 'site-docs'
 compression = Compression.GZIP

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ffadbfd/core/src/main/scala/kafka/utils/IteratorTemplate.scala
--
diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala 
b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
index 8c44955..17c152d 100644
--- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala
+++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
@@ -75,7 +75,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with 
java.util.Iterator[T
 null.asInstanceOf[T]
   }
   
-  def remove = 
+  override def remove =
 throw new UnsupportedOperationException("Removal not supported")
 
   protected def resetState() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ffadbfd/gradle/dependencies.gradle
--
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index ab1d057..02a4cd9 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -39,9 +39,9 @@ versions += [
   powermock: "1.6.4",
   reflections: "0.9.10",
   rocksDB: "4.9.0",
-  scalaTest: "2.2.6",
+  scalaTest: "3.0.1",
   scalaParserCombinators: "1.0.4",
-  scoverage: "1.1.1",
+  scoverage: "1.3.0",
   slf4j: "1.7.21",
   snappy: "1.1.2.6",
   zkclient: "0.9",
@@ -51,11 +51,14 @@ versions += [
 // Add Scala version
 def defaultScala210Version = '2.10.6'
 def defaultScala211Version = '2.11.8'
+def defaultScala212Version = '2.12.0'
 if (hasProperty('scalaVersion')) {
   if (scalaVersion == '2.10') {
 versions["scala"] = defaultScala210Version
   } else if (scalaVersion == '2.11') {
 versions["scala"] = defaultScala211Version
+  } else if (scalaVersion == '2.12') {
+versions["scala"] = defaultScala212Version
   } else {
 versions["scala"] = scalaVersion
   }



kafka git commit: KAFKA-3637: Added initial states

2016-11-30 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk f9d7808ba -> ea42d6535


KAFKA-3637: Added initial states

Author: Eno Thereska 

Reviewers: Ismael Juma, Dan Norwood, Xavier Léauté, Damian Guy, Michael G. 
Noll, Matthias J. Sax, Guozhang Wang

Closes #2135 from enothereska/KAFKA-3637-streams-state


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea42d653
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea42d653
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea42d653

Branch: refs/heads/trunk
Commit: ea42d65354b5905668d45dedae1cd1f7f39c888c
Parents: f9d7808
Author: Eno Thereska 
Authored: Wed Nov 30 22:23:31 2016 -0800
Committer: Guozhang Wang 
Committed: Wed Nov 30 22:23:31 2016 -0800

--
 .../org/apache/kafka/streams/KafkaStreams.java  | 144 ---
 .../processor/internals/StreamThread.java   | 140 +++---
 .../apache/kafka/streams/KafkaStreamsTest.java  |  28 +++-
 .../QueryableStateIntegrationTest.java  |   2 +
 .../processor/internals/StreamThreadTest.java   |  36 -
 5 files changed, 309 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/ea42d653/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
--
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6b35d24..df6da21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -46,6 +46,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -92,10 +95,6 @@ public class KafkaStreams {
 private static final Logger log = 
LoggerFactory.getLogger(KafkaStreams.class);
 private static final String JMX_PREFIX = "kafka.streams";
 public static final int DEFAULT_CLOSE_TIMEOUT = 0;
-
-private enum StreamsState { created, running, stopped }
-private StreamsState state = StreamsState.created;
-
 private final StreamThread[] threads;
 private final Metrics metrics;
 private final QueryableStoreProvider queryableStoreProvider;
@@ -109,6 +108,110 @@ public class KafkaStreams {
 
 private final StreamsConfig config;
 
+// container states
+/**
+ * Kafka Streams states are the possible state that a Kafka Streams 
instance can be in.
+ * An instance must only be in one state at a time.
+ * Note this instance will be in "Rebalancing" state if any of its threads 
is rebalancing
+ * The expected state transition with the following defined states is:
+ *
+ * +---+
+ * +<--|Created|
+ * |   +-+-+
+ * | |   +--+
+ * | v   |  |
+ * |   +-+---v--+--+
+ * +<- | Rebalancing   |<+
+ * |   +-+-+ ^
+ * | +--+|
+ * | |  ||
+ * |   +-+---v--+-+  |
+ * +-->|Running   |--+
+ * |   +-++
+ * | |
+ * | v
+ * | +---++
+ * +>|Pending |
+ *   |Shutdown|
+ *   +---++
+ *   |
+ *   v
+ * +-+-+
+ * |Not Running|
+ * +---+
+ */
+public enum State {
+CREATED(1, 2, 3), RUNNING(2, 3), REBALANCING(1, 2, 3), 
PENDING_SHUTDOWN(4), NOT_RUNNING;
+
+private final Set validTransitions = new HashSet<>();
+
+State(final Integer...validTransitions) {
+this.validTransitions.addAll(Arrays.asList(validTransitions));
+}
+
+public boolean isRunning() {
+return this.equals(RUNNING) || this.equals(REBALANCING);
+}
+public boolean isCreatedOrRunning() {
+return isRunning() || this.equals(CREATED);
+}
+public boolean isValidTransition(final State newState) {
+return validTransitions.contains(newState.ordinal());
+}
+}
+private volatile State state = KafkaStreams.State.CREATED;
+private StateListener stateListener = null;
+private final StreamStateListener 

kafka-site git commit: add Guozhang's sig-key

2016-11-30 Thread guozhang
Repository: kafka-site
Updated Branches:
  refs/heads/asf-site e18293b6c -> 33c99ca54


add Guozhang's sig-key


Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/33c99ca5
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/33c99ca5
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/33c99ca5

Branch: refs/heads/asf-site
Commit: 33c99ca54dfbcaf52ca9b5b2ac9bdf2fcedcb7b9
Parents: e18293b
Author: Guozhang Wang 
Authored: Wed Nov 30 14:15:02 2016 -0800
Committer: Guozhang Wang 
Committed: Wed Nov 30 14:15:02 2016 -0800

--
 KEYS | 62 ++
 1 file changed, 62 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/33c99ca5/KEYS
--
diff --git a/KEYS b/KEYS
index 8d1eeff..b07dee0 100644
--- a/KEYS
+++ b/KEYS
@@ -98,6 +98,7 @@ 
kyHxwitfr4d8mm6GW3/YluGi7oPCnH3FEsIoUZMyfGs8XW/zwH26mCIpYpu9rDHh
 V/dDwg+iqlaqtN6rIS4E1gML3K33OVsdUvlcodhE
 =gNdQ
 -END PGP PUBLIC KEY BLOCK-
+
 -BEGIN PGP PUBLIC KEY BLOCK-
 Version: GnuPG v2.0.14 (GNU/Linux)
 
@@ -493,6 +494,7 @@ 
av/CRbFr8aRQGW5BEZ7r0WNGvCicEKobxS45qykkWyKn+urcmY4xE/EVxKxFdWQi
 whcLh0Ee99pc
 =4nQc
 -END PGP PUBLIC KEY BLOCK-
+
 -BEGIN PGP PUBLIC KEY BLOCK-
 Version: GnuPG v2
 
@@ -539,3 +541,63 @@ 
OGnD0b2wLdRLkD93whR4Fiiowd0STSwrKvN34/0FQj295u+efzIUQQTldoJ+VlGT
 /2qyGw==
 =pSZG
 -END PGP PUBLIC KEY BLOCK-
+
+pub   rsa4096/3B417B9B 2016-11-30
+uid [ultimate] Guozhang Wang (Key for signing code and releases) 

+sig 33B417B9B 2016-11-30  Guozhang Wang (Key for signing code and 
releases) 
+sub   rsa4096/B82BF2DB 2016-11-30
+sig  3B417B9B 2016-11-30  Guozhang Wang (Key for signing code and 
releases) 
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+Version: GnuPG v2
+
+mQINBFg/SbQBEACpLMbB0yxZm1RtLAPHPrjX2E8bQo3pNFpWkgBlQ2X77If2RwaD
+gcHYdBuSHjFGgPuvWzSGMhMZpSHxRGmvS5lfrTPVZ6OrlhcO2JcmrQEYl4zGoQE3
+JcPPBBZUGixm9eQB2fwhBExBw/78ksdhrWEJ3gw0+a6f9MdYNf6FHraV0Jz/hPvY
+aF9vOTuUX90Iuwy2JgPu+pMMzIDMDhW2SoszYAU46yeUa2h7FJhNcWVvgTCzMZEp
+/RiGtKcPRj1eb2LnWzrPvbSIKPJ2eyD6ZOH22QVYyM4I+sDCz+bq1B7KOfF+11IV
+58GMhu7rTWurfIcHeJl55uNvRmrvc02ShfuedEzc2FeOuqqyyVTAhnicGVrxLow3
+0BLaOfQrvynFkiUO+N9A0fbQAkwjAvkf4pMbhTG+UAKaQU/2iYpg1XoClq/eKgmb
+3j0SxnKE+MQLswx98ciewnKlFjXHU9VKmFm/9O6mpgGSQ22BwwuLgrm1L2FmnT4U
+QabAG7PNkgglkZFYzmBe7dbOITIPuRfmPjrqtpQuv6mw4GreFHW24e4OMoRjUBaI
+saR3QFf3Oni4Zqc2ods4s4bxPbfQer7z2M6w+8Z42ZkzvNCzGgSftaUoHM+uIep4
+CLPrHjhMY1e6Q24KxjmcOyM29K/OR+nleEaPHLRokgqspF/Re/lkP5mnfwARAQAB
+tEZHdW96aGFuZyBXYW5nIChLZXkgZm9yIHNpZ25pbmcgY29kZSBhbmQgcmVsZWFz
+ZXMpIDx3YW5nZ3VvekBnbWFpbC5jb20+iQI3BBMBCgAhBQJYP0m0AhsDBQsJCAcD
+BRUKCQgLBRYCAwEAAh4BAheAAAoJEKbsqJU7QXubQgwQAIFRFlvbdHjph5GrP2EX
+dgjzibHU/1QzOk+VaSeIZKUk6m7Z5288R1tHbqhOyf2A1pWPS5fnzRG2x7cvMXge
+zf7pbsTyKwO111ndA25kyoF1yeW0f2ycfjdFVd0H3mrqkQkXI59QqGGvVoGbiaP7
+27YF7n/BQXkKIScEVh/NLvelhDxvOASdEkK2cY+Xn35SiDKUhYCy7Ex7UA5G60ed
+ygNVyx0onDydq8nX28UOranfTMPmwEAvBzPbs0RxWC89Jl4eqDmoRQ5c5MWsOe/T
+ezJfWo/8ZDd6N/MIffD1/8DGSiPUwfxiTTIun1MgMQYmVQQn152oJfnfyhirUzxa
+3yvh7vodBjAybOFSc3ewIgfcmkcOzR8iCdOxe3yvInlg89zRhPQcp/UkEsGMVC1i
+x4UNMG79/bA4sx7mS0a2WjN1OS0vMFdwwoB7+FcUjJzHmE/zc1nWvnwG7xPhijQv
+E/sJ+2C71q6uWUcqIeF5FUNz/GC1JjVobXeay1/GVB58YYN8gYMrm3voIoupbVQs
+ghxeTZIFS+haPikB/mqgE5TqIfV+rz1To0SmApdy5b3SmkycRXaAtlSxwZWc9AsP
+9Cn5y3TChrEYdjTt7MHUPZNPHVJrMyanqv+kgiNhhCfuIRwuMiBl1QrwNDEKXpS9
+TNWezAVNRjEXIdUOsBFcNcUcuQINBFg/SbQBEADB2nH9Kvz6XWyELdJeOg6ulKsh
+xvg+SSVJIhvvFO2IzcVes0I1W7rXrOJcrM6qs43MCiFibZdHt9eZjVQ0ALGeek1p
+bsBBPvyF2pofFowlrB9sEU/J9MOhqtCdPofYrt6aCqAS0nARZtO7KD5EIEEmjTK5
+A+gILJBvSbxpc4lXfkyH9ZHtaVm6qV8RrCLulqKPRUYTRg34lB/SjonCYEN6XYDa
+4FPBvJHVYPgoSh1fMn55nvCOuMoiAfJ8EwrY6V8UCwnL5R72twJiurX557JlFAFm
+NYH1OhNlaChsHxqR9fFxRPUazsVmx/KB5cw+d3f3EdB/wFK5gZV+Z52wQHiIEeJt
+Y1XNhWGITvyYv/TAmhvN5m4xFCvn/O0oUsrzD348Qk9Lq3oqFJlJg5IuwzokIvsf
+tjD7OviKxD4nQzWRkxq0ydA21ttP1+VKvRnFTDPZ1lK549HGrbk3NPi9/50eMoyI
+nEhLuJgUOFTicOnG9ouiBS/lAAWA+kDUdbNGmbZst749xBC0x5JKgXA81EjeD6GX
+A4nd03iiuJQeZdQ/Gs8yZTVy7HGH3vDldTQxtjualkF6uFChpOBwxodYYgM820fU
+VHNVY8y+gDtj7yGt/8j3CSIaI/uAhuco4rsM7icC3bISXHYWv1vMD0Tg8KVC+go7
+npkMLG6VNHhbPbk/vwARAQABiQIfBBgBCgAJBQJYP0m0AhsMAAoJEKbsqJU7QXub
+9xsQAIkcKw9J4WOpj7wM9oUgTy8gcdaSaMlzY147JPH8rPOJacsgDqWfvYmW3JJm
+kZqylpQ969vWLawuAq4km3TyCxXvZP3Y8m/z/1wXV/2kB2wyf3KwmVHh7PPSigqB
+NOzETxNoQQsmMWITUhRnsCReHPz9BXuKzrGsX/CxcSmIz3QXjsNOktor8FnawNUw
+hmFtv13Xl31fjEEfBOMpeH2DpLxNxRXPFCf38XTWGHl00I1HoO4riF219OLMFhgZ
+7OzOHO7NbETm5sv6RHxktDblAjllXXjaRGkFpUgZqcXui26QIItdWEAuRLy6QAHL
+3PzCMmBtJUCaoetaFAxXMWYZWWwnS/mt+Qqar+yCN9thFyHK1n1MUz2CZ3T7tW+G

kafka git commit: KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy

2016-11-30 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 b1f8fc7b6 -> aec74ae5b


KAFKA-4469; Fix consumer performance regression from inefficient list removal 
and copy

Author: Jason Gustafson 

Reviewers: Ismael Juma , Guozhang Wang 

Closes #2190 from hachikuji/KAFKA-4469

(cherry picked from commit f9d7808bab52a0a6fa879aaac0b1da80e0f33adb)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aec74ae5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aec74ae5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aec74ae5

Branch: refs/heads/0.10.1
Commit: aec74ae5b2cd1815fce0227b0f2773ca972457bf
Parents: b1f8fc7
Author: Jason Gustafson 
Authored: Wed Nov 30 13:18:04 2016 -0800
Committer: Jason Gustafson 
Committed: Wed Nov 30 13:33:50 2016 -0800

--
 .../clients/consumer/internals/Fetcher.java | 82 ++--
 1 file changed, 41 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/aec74ae5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bfc1a0b..fdcfc30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -62,7 +62,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -417,25 +416,40 @@ public class Fetcher {
 int recordsRemaining = maxPollRecords;
 
 while (recordsRemaining > 0) {
-if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
 CompletedFetch completedFetch = completedFetches.poll();
 if (completedFetch == null)
 break;
 
 nextInLineRecords = parseFetchedData(completedFetch);
 } else {
-recordsRemaining -= append(drained, nextInLineRecords, 
recordsRemaining);
+TopicPartition partition = nextInLineRecords.partition;
+
+List> records = 
drainRecords(nextInLineRecords, recordsRemaining);
+if (!records.isEmpty()) {
+List> currentRecords = 
drained.get(partition);
+if (currentRecords == null) {
+drained.put(partition, records);
+} else {
+// this case shouldn't usually happen because we only 
send one fetch at a time per partition,
+// but it might conceivably happen in some rare cases 
(such as partition leader changes).
+// we have to copy to a new list because the old one 
may be immutable
+List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
+newRecords.addAll(currentRecords);
+newRecords.addAll(records);
+drained.put(partition, newRecords);
+}
+recordsRemaining -= records.size();
+}
 }
 }
 
 return drained;
 }
 
-private int append(Map>> drained,
-   PartitionRecords partitionRecords,
-   int maxRecords) {
-if (partitionRecords.isEmpty())
-return 0;
+private List> drainRecords(PartitionRecords 
partitionRecords, int maxRecords) {
+if (partitionRecords.isDrained())
+return Collections.emptyList();
 
 if (!subscriptions.isAssigned(partitionRecords.partition)) {
 // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
@@ -448,22 +462,14 @@ public class Fetcher {
 log.debug("Not returning fetched records for assigned 
partition {} since it is no longer fetchable", partitionRecords.partition);
 } else if (partitionRecords.fetchOffset == position) {
 // we are ensured to have at least one record since we already 
checked for emptiness
-

kafka git commit: KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy

2016-11-30 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk 8d188c911 -> f9d7808ba


KAFKA-4469; Fix consumer performance regression from inefficient list removal 
and copy

Author: Jason Gustafson 

Reviewers: Ismael Juma , Guozhang Wang 

Closes #2190 from hachikuji/KAFKA-4469


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9d7808b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9d7808b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9d7808b

Branch: refs/heads/trunk
Commit: f9d7808bab52a0a6fa879aaac0b1da80e0f33adb
Parents: 8d188c9
Author: Jason Gustafson 
Authored: Wed Nov 30 13:18:04 2016 -0800
Committer: Jason Gustafson 
Committed: Wed Nov 30 13:18:04 2016 -0800

--
 .../clients/consumer/internals/Fetcher.java | 82 ++--
 1 file changed, 41 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9d7808b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 703ea29..e414fcb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,7 +61,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -416,25 +415,40 @@ public class Fetcher {
 int recordsRemaining = maxPollRecords;
 
 while (recordsRemaining > 0) {
-if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
 CompletedFetch completedFetch = completedFetches.poll();
 if (completedFetch == null)
 break;
 
 nextInLineRecords = parseFetchedData(completedFetch);
 } else {
-recordsRemaining -= append(drained, nextInLineRecords, 
recordsRemaining);
+TopicPartition partition = nextInLineRecords.partition;
+
+List> records = 
drainRecords(nextInLineRecords, recordsRemaining);
+if (!records.isEmpty()) {
+List> currentRecords = 
drained.get(partition);
+if (currentRecords == null) {
+drained.put(partition, records);
+} else {
+// this case shouldn't usually happen because we only 
send one fetch at a time per partition,
+// but it might conceivably happen in some rare cases 
(such as partition leader changes).
+// we have to copy to a new list because the old one 
may be immutable
+List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
+newRecords.addAll(currentRecords);
+newRecords.addAll(records);
+drained.put(partition, newRecords);
+}
+recordsRemaining -= records.size();
+}
 }
 }
 
 return drained;
 }
 
-private int append(Map>> drained,
-   PartitionRecords partitionRecords,
-   int maxRecords) {
-if (partitionRecords.isEmpty())
-return 0;
+private List> drainRecords(PartitionRecords 
partitionRecords, int maxRecords) {
+if (partitionRecords.isDrained())
+return Collections.emptyList();
 
 if (!subscriptions.isAssigned(partitionRecords.partition)) {
 // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
@@ -447,22 +461,14 @@ public class Fetcher {
 log.debug("Not returning fetched records for assigned 
partition {} since it is no longer fetchable", partitionRecords.partition);
 } else if (partitionRecords.fetchOffset == position) {
 // we are ensured to have at least one record since we already 
checked for emptiness
-List> partRecords = 
partitionRecords.take(maxRecords);
+List> 

kafka git commit: KAFKA-4271: Fix the server start script for Windows 32-bit OS

2016-11-30 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 ee0ab7dd0 -> b1f8fc7b6


KAFKA-4271: Fix the server start script for Windows 32-bit OS

Without this fix the new consumer fails to run on a 32-bit Windows OS.

Author: Vahid Hashemian 

Reviewers: Jason Gustafson, Guozhang Wang

Closes #2189 from vahidhashemian/KAFKA-4271

(cherry picked from commit 8d188c9110f7f2a6f9f16f1340d9d254bf583741)
Signed-off-by: Guozhang Wang 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1f8fc7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1f8fc7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1f8fc7b

Branch: refs/heads/0.10.1
Commit: b1f8fc7b6e40f371ebf5617b5f7f8b7ca5693903
Parents: ee0ab7d
Author: Vahid Hashemian 
Authored: Wed Nov 30 12:19:23 2016 -0800
Committer: Guozhang Wang 
Committed: Wed Nov 30 12:19:35 2016 -0800

--
 bin/windows/kafka-server-start.bat | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1f8fc7b/bin/windows/kafka-server-start.bat
--
diff --git a/bin/windows/kafka-server-start.bat 
b/bin/windows/kafka-server-start.bat
index 315507e..1aa859c 100644
--- a/bin/windows/kafka-server-start.bat
+++ b/bin/windows/kafka-server-start.bat
@@ -24,7 +24,15 @@ IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
 set 
KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
 )
 IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
-set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
+rem detect OS architecture
+wmic os get osarchitecture | find /i "32-bit" >nul 2>&1
+IF NOT ERRORLEVEL 1 (
+rem 32-bit OS
+set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
+) ELSE (
+rem 64-bit OS
+set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
+)
 )
 %~dp0kafka-run-class.bat kafka.Kafka %*
 EndLocal



kafka git commit: KAFKA-4271: Fix the server start script for Windows 32-bit OS

2016-11-30 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 497e669dd -> 8d188c911


KAFKA-4271: Fix the server start script for Windows 32-bit OS

Without this fix the new consumer fails to run on a 32-bit Windows OS.

Author: Vahid Hashemian 

Reviewers: Jason Gustafson, Guozhang Wang

Closes #2189 from vahidhashemian/KAFKA-4271


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d188c91
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d188c91
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d188c91

Branch: refs/heads/trunk
Commit: 8d188c9110f7f2a6f9f16f1340d9d254bf583741
Parents: 497e669
Author: Vahid Hashemian 
Authored: Wed Nov 30 12:19:23 2016 -0800
Committer: Guozhang Wang 
Committed: Wed Nov 30 12:19:23 2016 -0800

--
 bin/windows/kafka-server-start.bat | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d188c91/bin/windows/kafka-server-start.bat
--
diff --git a/bin/windows/kafka-server-start.bat 
b/bin/windows/kafka-server-start.bat
index 315507e..1aa859c 100644
--- a/bin/windows/kafka-server-start.bat
+++ b/bin/windows/kafka-server-start.bat
@@ -24,7 +24,15 @@ IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
 set 
KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
 )
 IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
-set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
+rem detect OS architecture
+wmic os get osarchitecture | find /i "32-bit" >nul 2>&1
+IF NOT ERRORLEVEL 1 (
+rem 32-bit OS
+set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
+) ELSE (
+rem 64-bit OS
+set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
+)
 )
 %~dp0kafka-run-class.bat kafka.Kafka %*
 EndLocal



kafka git commit: KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat <gharatmayures...@gmail.com> and Sumant Tambe <suta...@yahoo.com>

2016-11-30 Thread jjkoshy
Repository: kafka
Updated Branches:
  refs/heads/trunk 1503f7603 -> 497e669dd


KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat 
 and Sumant Tambe 

The last patch submitted by MayureshGharat (back in Dec 15) has been rebased to 
the latest trunk. I took care of a couple of test failures (MetricsTest) along 
the way. jjkoshy , granders , avianey , you may be interested in this PR.

Author: Sumant Tambe 
Author: Mayuresh Gharat 
Author: MayureshGharat 

Reviewers: Joel Koshy 

Closes #1664 from sutambe/async-delete-topic


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/497e669d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/497e669d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/497e669d

Branch: refs/heads/trunk
Commit: 497e669dd806fc984f5dceaede4a1f40f4e77c48
Parents: 1503f76
Author: Mayuresh Gharat 
Authored: Wed Nov 30 10:40:31 2016 -0800
Committer: Joel Koshy 
Committed: Wed Nov 30 10:40:31 2016 -0800

--
 .../main/scala/kafka/cluster/Partition.scala|  5 +-
 .../main/scala/kafka/log/AbstractIndex.scala| 23 ++---
 core/src/main/scala/kafka/log/Log.scala | 31 ---
 core/src/main/scala/kafka/log/LogManager.scala  | 91 
 .../scala/kafka/server/ReplicaManager.scala | 17 ++--
 5 files changed, 114 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/cluster/Partition.scala
--
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 4d3fb56..44d6a77 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -144,12 +144,13 @@ class Partition(val topic: String,
   assignedReplicaMap.clear()
   inSyncReplicas = Set.empty[Replica]
   leaderReplicaIdOpt = None
+  val topicPartition = TopicAndPartition(topic, partitionId)
   try {
-logManager.deleteLog(TopicAndPartition(topic, partitionId))
+logManager.asyncDelete(topicPartition)
 removePartitionMetrics()
   } catch {
 case e: IOException =>
-  fatal("Error deleting the log for partition [%s,%d]".format(topic, 
partitionId), e)
+  fatal(s"Error deleting the log for partition $topicPartition", e)
   Runtime.getRuntime().halt(1)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/AbstractIndex.scala
--
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index d594f18..77ef0f7 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -33,11 +33,11 @@ import scala.math.ceil
 /**
  * The abstract index class which holds entry format agnostic methods.
  *
- * @param _file The index file
+ * @param file The index file
  * @param baseOffset the base offset of the segment that this index is 
corresponding to.
  * @param maxIndexSize The maximum index size in bytes.
  */
-abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, 
val baseOffset: Long, val maxIndexSize: Int = -1)
+abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: 
Long, val maxIndexSize: Int = -1)
 extends Logging {
 
   protected def entrySize: Int
@@ -46,8 +46,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] 
var _file: File, val
 
   @volatile
   protected var mmap: MappedByteBuffer = {
-val newlyCreated = _file.createNewFile()
-val raf = new RandomAccessFile(_file, "rw")
+val newlyCreated = file.createNewFile()
+val raf = new RandomAccessFile(file, "rw")
 try {
   /* pre-allocate the file if necessary */
   if(newlyCreated) {
@@ -92,11 +92,6 @@ abstract class AbstractIndex[K, V](@volatile private[this] 
var _file: File, val
   def entries: Int = _entries
 
   /**
-   * The index file
-   */
-  def file: File = _file
-
-  /**
* Reset the size of the memory map and the underneath file. This is used in 
two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment 
being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a 
new log segment became active;
@@ -104,7 +99,7 @@ abstract class AbstractIndex[K, V](@volatile private[this] 
var _file: File, val
*/
   def