kafka git commit: KAFKA-4438; Cross compile to Scala 2.12.0
Repository: kafka Updated Branches: refs/heads/0.10.1 aec74ae5b -> 9ffadbfdc KAFKA-4438; Cross compile to Scala 2.12.0 (cherry picked from commit f3aad3b54b7cbc5109d8398829a31100fd82b3e0) Author: Bernard LeachReviewers: Guozhang Wang Closes #2164 from leachbj/0.10.1-2.12-backport Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ffadbfd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ffadbfd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ffadbfd Branch: refs/heads/0.10.1 Commit: 9ffadbfdc6f1b1fc47d48cbb7edf0f2f3a5f8f5e Parents: aec74ae Author: Bernard Leach Authored: Wed Nov 30 22:28:08 2016 -0800 Committer: Guozhang Wang Committed: Wed Nov 30 22:28:08 2016 -0800 -- build.gradle | 4 ++-- core/src/main/scala/kafka/utils/IteratorTemplate.scala | 2 +- gradle/dependencies.gradle | 7 +-- 3 files changed, 8 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/9ffadbfd/build.gradle -- diff --git a/build.gradle b/build.gradle index 5e0e234..fded48e 100644 --- a/build.gradle +++ b/build.gradle @@ -313,7 +313,7 @@ task jacocoRootReport(type: org.gradle.testing.jacoco.tasks.JacocoReport) { task reportCoverage(dependsOn: ['jacocoRootReport', 'core:reportScoverage']) -for ( sv in ['2_10', '2_11'] ) { +for ( sv in ['2_10', '2_11', '2_12'] ) { String svInDot = sv.replaceAll( "_", ".") tasks.create(name: "jar_core_${sv}", type: GradleBuild) { @@ -505,7 +505,7 @@ project(':core') { task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', - 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', + 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':streams:genStreamsConfigDocs'], type: Tar) { classifier = 'site-docs' compression = Compression.GZIP http://git-wip-us.apache.org/repos/asf/kafka/blob/9ffadbfd/core/src/main/scala/kafka/utils/IteratorTemplate.scala -- diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala index 8c44955..17c152d 100644 --- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala +++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala @@ -75,7 +75,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T null.asInstanceOf[T] } - def remove = + override def remove = throw new UnsupportedOperationException("Removal not supported") protected def resetState() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9ffadbfd/gradle/dependencies.gradle -- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index ab1d057..02a4cd9 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -39,9 +39,9 @@ versions += [ powermock: "1.6.4", reflections: "0.9.10", rocksDB: "4.9.0", - scalaTest: "2.2.6", + scalaTest: "3.0.1", scalaParserCombinators: "1.0.4", - scoverage: "1.1.1", + scoverage: "1.3.0", slf4j: "1.7.21", snappy: "1.1.2.6", zkclient: "0.9", @@ -51,11 +51,14 @@ versions += [ // Add Scala version def defaultScala210Version = '2.10.6' def defaultScala211Version = '2.11.8' +def defaultScala212Version = '2.12.0' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.10') { versions["scala"] = defaultScala210Version } else if (scalaVersion == '2.11') { versions["scala"] = defaultScala211Version + } else if (scalaVersion == '2.12') { +versions["scala"] = defaultScala212Version } else { versions["scala"] = scalaVersion }
kafka git commit: KAFKA-3637: Added initial states
Repository: kafka Updated Branches: refs/heads/trunk f9d7808ba -> ea42d6535 KAFKA-3637: Added initial states Author: Eno ThereskaReviewers: Ismael Juma, Dan Norwood, Xavier Léauté, Damian Guy, Michael G. Noll, Matthias J. Sax, Guozhang Wang Closes #2135 from enothereska/KAFKA-3637-streams-state Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea42d653 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea42d653 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea42d653 Branch: refs/heads/trunk Commit: ea42d65354b5905668d45dedae1cd1f7f39c888c Parents: f9d7808 Author: Eno Thereska Authored: Wed Nov 30 22:23:31 2016 -0800 Committer: Guozhang Wang Committed: Wed Nov 30 22:23:31 2016 -0800 -- .../org/apache/kafka/streams/KafkaStreams.java | 144 --- .../processor/internals/StreamThread.java | 140 +++--- .../apache/kafka/streams/KafkaStreamsTest.java | 28 +++- .../QueryableStateIntegrationTest.java | 2 + .../processor/internals/StreamThreadTest.java | 36 - 5 files changed, 309 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/ea42d653/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6b35d24..df6da21 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -46,6 +46,9 @@ import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; import java.util.concurrent.TimeUnit; /** @@ -92,10 +95,6 @@ public class KafkaStreams { private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class); private static final String JMX_PREFIX = "kafka.streams"; public static final int DEFAULT_CLOSE_TIMEOUT = 0; - -private enum StreamsState { created, running, stopped } -private StreamsState state = StreamsState.created; - private final StreamThread[] threads; private final Metrics metrics; private final QueryableStoreProvider queryableStoreProvider; @@ -109,6 +108,110 @@ public class KafkaStreams { private final StreamsConfig config; +// container states +/** + * Kafka Streams states are the possible state that a Kafka Streams instance can be in. + * An instance must only be in one state at a time. + * Note this instance will be in "Rebalancing" state if any of its threads is rebalancing + * The expected state transition with the following defined states is: + * + * +---+ + * +<--|Created| + * | +-+-+ + * | | +--+ + * | v | | + * | +-+---v--+--+ + * +<- | Rebalancing |<+ + * | +-+-+ ^ + * | +--+| + * | | || + * | +-+---v--+-+ | + * +-->|Running |--+ + * | +-++ + * | | + * | v + * | +---++ + * +>|Pending | + * |Shutdown| + * +---++ + * | + * v + * +-+-+ + * |Not Running| + * +---+ + */ +public enum State { +CREATED(1, 2, 3), RUNNING(2, 3), REBALANCING(1, 2, 3), PENDING_SHUTDOWN(4), NOT_RUNNING; + +private final Set validTransitions = new HashSet<>(); + +State(final Integer...validTransitions) { +this.validTransitions.addAll(Arrays.asList(validTransitions)); +} + +public boolean isRunning() { +return this.equals(RUNNING) || this.equals(REBALANCING); +} +public boolean isCreatedOrRunning() { +return isRunning() || this.equals(CREATED); +} +public boolean isValidTransition(final State newState) { +return validTransitions.contains(newState.ordinal()); +} +} +private volatile State state = KafkaStreams.State.CREATED; +private StateListener stateListener = null; +private final StreamStateListener
kafka-site git commit: add Guozhang's sig-key
Repository: kafka-site Updated Branches: refs/heads/asf-site e18293b6c -> 33c99ca54 add Guozhang's sig-key Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/33c99ca5 Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/33c99ca5 Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/33c99ca5 Branch: refs/heads/asf-site Commit: 33c99ca54dfbcaf52ca9b5b2ac9bdf2fcedcb7b9 Parents: e18293b Author: Guozhang WangAuthored: Wed Nov 30 14:15:02 2016 -0800 Committer: Guozhang Wang Committed: Wed Nov 30 14:15:02 2016 -0800 -- KEYS | 62 ++ 1 file changed, 62 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kafka-site/blob/33c99ca5/KEYS -- diff --git a/KEYS b/KEYS index 8d1eeff..b07dee0 100644 --- a/KEYS +++ b/KEYS @@ -98,6 +98,7 @@ kyHxwitfr4d8mm6GW3/YluGi7oPCnH3FEsIoUZMyfGs8XW/zwH26mCIpYpu9rDHh V/dDwg+iqlaqtN6rIS4E1gML3K33OVsdUvlcodhE =gNdQ -END PGP PUBLIC KEY BLOCK- + -BEGIN PGP PUBLIC KEY BLOCK- Version: GnuPG v2.0.14 (GNU/Linux) @@ -493,6 +494,7 @@ av/CRbFr8aRQGW5BEZ7r0WNGvCicEKobxS45qykkWyKn+urcmY4xE/EVxKxFdWQi whcLh0Ee99pc =4nQc -END PGP PUBLIC KEY BLOCK- + -BEGIN PGP PUBLIC KEY BLOCK- Version: GnuPG v2 @@ -539,3 +541,63 @@ OGnD0b2wLdRLkD93whR4Fiiowd0STSwrKvN34/0FQj295u+efzIUQQTldoJ+VlGT /2qyGw== =pSZG -END PGP PUBLIC KEY BLOCK- + +pub rsa4096/3B417B9B 2016-11-30 +uid [ultimate] Guozhang Wang (Key for signing code and releases) +sig 33B417B9B 2016-11-30 Guozhang Wang (Key for signing code and releases) +sub rsa4096/B82BF2DB 2016-11-30 +sig 3B417B9B 2016-11-30 Guozhang Wang (Key for signing code and releases) + +-BEGIN PGP PUBLIC KEY BLOCK- +Version: GnuPG v2 + +mQINBFg/SbQBEACpLMbB0yxZm1RtLAPHPrjX2E8bQo3pNFpWkgBlQ2X77If2RwaD +gcHYdBuSHjFGgPuvWzSGMhMZpSHxRGmvS5lfrTPVZ6OrlhcO2JcmrQEYl4zGoQE3 +JcPPBBZUGixm9eQB2fwhBExBw/78ksdhrWEJ3gw0+a6f9MdYNf6FHraV0Jz/hPvY +aF9vOTuUX90Iuwy2JgPu+pMMzIDMDhW2SoszYAU46yeUa2h7FJhNcWVvgTCzMZEp +/RiGtKcPRj1eb2LnWzrPvbSIKPJ2eyD6ZOH22QVYyM4I+sDCz+bq1B7KOfF+11IV +58GMhu7rTWurfIcHeJl55uNvRmrvc02ShfuedEzc2FeOuqqyyVTAhnicGVrxLow3 +0BLaOfQrvynFkiUO+N9A0fbQAkwjAvkf4pMbhTG+UAKaQU/2iYpg1XoClq/eKgmb +3j0SxnKE+MQLswx98ciewnKlFjXHU9VKmFm/9O6mpgGSQ22BwwuLgrm1L2FmnT4U +QabAG7PNkgglkZFYzmBe7dbOITIPuRfmPjrqtpQuv6mw4GreFHW24e4OMoRjUBaI +saR3QFf3Oni4Zqc2ods4s4bxPbfQer7z2M6w+8Z42ZkzvNCzGgSftaUoHM+uIep4 +CLPrHjhMY1e6Q24KxjmcOyM29K/OR+nleEaPHLRokgqspF/Re/lkP5mnfwARAQAB +tEZHdW96aGFuZyBXYW5nIChLZXkgZm9yIHNpZ25pbmcgY29kZSBhbmQgcmVsZWFz +ZXMpIDx3YW5nZ3VvekBnbWFpbC5jb20+iQI3BBMBCgAhBQJYP0m0AhsDBQsJCAcD +BRUKCQgLBRYCAwEAAh4BAheAAAoJEKbsqJU7QXubQgwQAIFRFlvbdHjph5GrP2EX +dgjzibHU/1QzOk+VaSeIZKUk6m7Z5288R1tHbqhOyf2A1pWPS5fnzRG2x7cvMXge +zf7pbsTyKwO111ndA25kyoF1yeW0f2ycfjdFVd0H3mrqkQkXI59QqGGvVoGbiaP7 +27YF7n/BQXkKIScEVh/NLvelhDxvOASdEkK2cY+Xn35SiDKUhYCy7Ex7UA5G60ed +ygNVyx0onDydq8nX28UOranfTMPmwEAvBzPbs0RxWC89Jl4eqDmoRQ5c5MWsOe/T +ezJfWo/8ZDd6N/MIffD1/8DGSiPUwfxiTTIun1MgMQYmVQQn152oJfnfyhirUzxa +3yvh7vodBjAybOFSc3ewIgfcmkcOzR8iCdOxe3yvInlg89zRhPQcp/UkEsGMVC1i +x4UNMG79/bA4sx7mS0a2WjN1OS0vMFdwwoB7+FcUjJzHmE/zc1nWvnwG7xPhijQv +E/sJ+2C71q6uWUcqIeF5FUNz/GC1JjVobXeay1/GVB58YYN8gYMrm3voIoupbVQs +ghxeTZIFS+haPikB/mqgE5TqIfV+rz1To0SmApdy5b3SmkycRXaAtlSxwZWc9AsP +9Cn5y3TChrEYdjTt7MHUPZNPHVJrMyanqv+kgiNhhCfuIRwuMiBl1QrwNDEKXpS9 +TNWezAVNRjEXIdUOsBFcNcUcuQINBFg/SbQBEADB2nH9Kvz6XWyELdJeOg6ulKsh +xvg+SSVJIhvvFO2IzcVes0I1W7rXrOJcrM6qs43MCiFibZdHt9eZjVQ0ALGeek1p +bsBBPvyF2pofFowlrB9sEU/J9MOhqtCdPofYrt6aCqAS0nARZtO7KD5EIEEmjTK5 +A+gILJBvSbxpc4lXfkyH9ZHtaVm6qV8RrCLulqKPRUYTRg34lB/SjonCYEN6XYDa +4FPBvJHVYPgoSh1fMn55nvCOuMoiAfJ8EwrY6V8UCwnL5R72twJiurX557JlFAFm +NYH1OhNlaChsHxqR9fFxRPUazsVmx/KB5cw+d3f3EdB/wFK5gZV+Z52wQHiIEeJt +Y1XNhWGITvyYv/TAmhvN5m4xFCvn/O0oUsrzD348Qk9Lq3oqFJlJg5IuwzokIvsf +tjD7OviKxD4nQzWRkxq0ydA21ttP1+VKvRnFTDPZ1lK549HGrbk3NPi9/50eMoyI +nEhLuJgUOFTicOnG9ouiBS/lAAWA+kDUdbNGmbZst749xBC0x5JKgXA81EjeD6GX +A4nd03iiuJQeZdQ/Gs8yZTVy7HGH3vDldTQxtjualkF6uFChpOBwxodYYgM820fU +VHNVY8y+gDtj7yGt/8j3CSIaI/uAhuco4rsM7icC3bISXHYWv1vMD0Tg8KVC+go7 +npkMLG6VNHhbPbk/vwARAQABiQIfBBgBCgAJBQJYP0m0AhsMAAoJEKbsqJU7QXub +9xsQAIkcKw9J4WOpj7wM9oUgTy8gcdaSaMlzY147JPH8rPOJacsgDqWfvYmW3JJm +kZqylpQ969vWLawuAq4km3TyCxXvZP3Y8m/z/1wXV/2kB2wyf3KwmVHh7PPSigqB +NOzETxNoQQsmMWITUhRnsCReHPz9BXuKzrGsX/CxcSmIz3QXjsNOktor8FnawNUw +hmFtv13Xl31fjEEfBOMpeH2DpLxNxRXPFCf38XTWGHl00I1HoO4riF219OLMFhgZ +7OzOHO7NbETm5sv6RHxktDblAjllXXjaRGkFpUgZqcXui26QIItdWEAuRLy6QAHL +3PzCMmBtJUCaoetaFAxXMWYZWWwnS/mt+Qqar+yCN9thFyHK1n1MUz2CZ3T7tW+G
kafka git commit: KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy
Repository: kafka Updated Branches: refs/heads/0.10.1 b1f8fc7b6 -> aec74ae5b KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy Author: Jason GustafsonReviewers: Ismael Juma , Guozhang Wang Closes #2190 from hachikuji/KAFKA-4469 (cherry picked from commit f9d7808bab52a0a6fa879aaac0b1da80e0f33adb) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aec74ae5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aec74ae5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aec74ae5 Branch: refs/heads/0.10.1 Commit: aec74ae5b2cd1815fce0227b0f2773ca972457bf Parents: b1f8fc7 Author: Jason Gustafson Authored: Wed Nov 30 13:18:04 2016 -0800 Committer: Jason Gustafson Committed: Wed Nov 30 13:33:50 2016 -0800 -- .../clients/consumer/internals/Fetcher.java | 82 ++-- 1 file changed, 41 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/aec74ae5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index bfc1a0b..fdcfc30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -62,7 +62,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -417,25 +416,40 @@ public class Fetcher { int recordsRemaining = maxPollRecords; while (recordsRemaining > 0) { -if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { +if (nextInLineRecords == null || nextInLineRecords.isDrained()) { CompletedFetch completedFetch = completedFetches.poll(); if (completedFetch == null) break; nextInLineRecords = parseFetchedData(completedFetch); } else { -recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); +TopicPartition partition = nextInLineRecords.partition; + +List > records = drainRecords(nextInLineRecords, recordsRemaining); +if (!records.isEmpty()) { +List > currentRecords = drained.get(partition); +if (currentRecords == null) { +drained.put(partition, records); +} else { +// this case shouldn't usually happen because we only send one fetch at a time per partition, +// but it might conceivably happen in some rare cases (such as partition leader changes). +// we have to copy to a new list because the old one may be immutable +List > newRecords = new ArrayList<>(records.size() + currentRecords.size()); +newRecords.addAll(currentRecords); +newRecords.addAll(records); +drained.put(partition, newRecords); +} +recordsRemaining -= records.size(); +} } } return drained; } -private int append(Map >> drained, - PartitionRecords partitionRecords, - int maxRecords) { -if (partitionRecords.isEmpty()) -return 0; +private List > drainRecords(PartitionRecords partitionRecords, int maxRecords) { +if (partitionRecords.isDrained()) +return Collections.emptyList(); if (!subscriptions.isAssigned(partitionRecords.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call @@ -448,22 +462,14 @@ public class Fetcher { log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); } else if (partitionRecords.fetchOffset == position) { // we are ensured to have at least one record since we already checked for emptiness -
kafka git commit: KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy
Repository: kafka Updated Branches: refs/heads/trunk 8d188c911 -> f9d7808ba KAFKA-4469; Fix consumer performance regression from inefficient list removal and copy Author: Jason GustafsonReviewers: Ismael Juma , Guozhang Wang Closes #2190 from hachikuji/KAFKA-4469 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9d7808b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9d7808b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9d7808b Branch: refs/heads/trunk Commit: f9d7808bab52a0a6fa879aaac0b1da80e0f33adb Parents: 8d188c9 Author: Jason Gustafson Authored: Wed Nov 30 13:18:04 2016 -0800 Committer: Jason Gustafson Committed: Wed Nov 30 13:18:04 2016 -0800 -- .../clients/consumer/internals/Fetcher.java | 82 ++-- 1 file changed, 41 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/f9d7808b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 703ea29..e414fcb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -61,7 +61,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -416,25 +415,40 @@ public class Fetcher { int recordsRemaining = maxPollRecords; while (recordsRemaining > 0) { -if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { +if (nextInLineRecords == null || nextInLineRecords.isDrained()) { CompletedFetch completedFetch = completedFetches.poll(); if (completedFetch == null) break; nextInLineRecords = parseFetchedData(completedFetch); } else { -recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); +TopicPartition partition = nextInLineRecords.partition; + +List > records = drainRecords(nextInLineRecords, recordsRemaining); +if (!records.isEmpty()) { +List > currentRecords = drained.get(partition); +if (currentRecords == null) { +drained.put(partition, records); +} else { +// this case shouldn't usually happen because we only send one fetch at a time per partition, +// but it might conceivably happen in some rare cases (such as partition leader changes). +// we have to copy to a new list because the old one may be immutable +List > newRecords = new ArrayList<>(records.size() + currentRecords.size()); +newRecords.addAll(currentRecords); +newRecords.addAll(records); +drained.put(partition, newRecords); +} +recordsRemaining -= records.size(); +} } } return drained; } -private int append(Map >> drained, - PartitionRecords partitionRecords, - int maxRecords) { -if (partitionRecords.isEmpty()) -return 0; +private List > drainRecords(PartitionRecords partitionRecords, int maxRecords) { +if (partitionRecords.isDrained()) +return Collections.emptyList(); if (!subscriptions.isAssigned(partitionRecords.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call @@ -447,22 +461,14 @@ public class Fetcher { log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); } else if (partitionRecords.fetchOffset == position) { // we are ensured to have at least one record since we already checked for emptiness -List > partRecords = partitionRecords.take(maxRecords); +List >
kafka git commit: KAFKA-4271: Fix the server start script for Windows 32-bit OS
Repository: kafka Updated Branches: refs/heads/0.10.1 ee0ab7dd0 -> b1f8fc7b6 KAFKA-4271: Fix the server start script for Windows 32-bit OS Without this fix the new consumer fails to run on a 32-bit Windows OS. Author: Vahid HashemianReviewers: Jason Gustafson, Guozhang Wang Closes #2189 from vahidhashemian/KAFKA-4271 (cherry picked from commit 8d188c9110f7f2a6f9f16f1340d9d254bf583741) Signed-off-by: Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1f8fc7b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1f8fc7b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1f8fc7b Branch: refs/heads/0.10.1 Commit: b1f8fc7b6e40f371ebf5617b5f7f8b7ca5693903 Parents: ee0ab7d Author: Vahid Hashemian Authored: Wed Nov 30 12:19:23 2016 -0800 Committer: Guozhang Wang Committed: Wed Nov 30 12:19:35 2016 -0800 -- bin/windows/kafka-server-start.bat | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/b1f8fc7b/bin/windows/kafka-server-start.bat -- diff --git a/bin/windows/kafka-server-start.bat b/bin/windows/kafka-server-start.bat index 315507e..1aa859c 100644 --- a/bin/windows/kafka-server-start.bat +++ b/bin/windows/kafka-server-start.bat @@ -24,7 +24,15 @@ IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] ( set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties ) IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( -set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G +rem detect OS architecture +wmic os get osarchitecture | find /i "32-bit" >nul 2>&1 +IF NOT ERRORLEVEL 1 ( +rem 32-bit OS +set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M +) ELSE ( +rem 64-bit OS +set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G +) ) %~dp0kafka-run-class.bat kafka.Kafka %* EndLocal
kafka git commit: KAFKA-4271: Fix the server start script for Windows 32-bit OS
Repository: kafka Updated Branches: refs/heads/trunk 497e669dd -> 8d188c911 KAFKA-4271: Fix the server start script for Windows 32-bit OS Without this fix the new consumer fails to run on a 32-bit Windows OS. Author: Vahid HashemianReviewers: Jason Gustafson, Guozhang Wang Closes #2189 from vahidhashemian/KAFKA-4271 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d188c91 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d188c91 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d188c91 Branch: refs/heads/trunk Commit: 8d188c9110f7f2a6f9f16f1340d9d254bf583741 Parents: 497e669 Author: Vahid Hashemian Authored: Wed Nov 30 12:19:23 2016 -0800 Committer: Guozhang Wang Committed: Wed Nov 30 12:19:23 2016 -0800 -- bin/windows/kafka-server-start.bat | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/8d188c91/bin/windows/kafka-server-start.bat -- diff --git a/bin/windows/kafka-server-start.bat b/bin/windows/kafka-server-start.bat index 315507e..1aa859c 100644 --- a/bin/windows/kafka-server-start.bat +++ b/bin/windows/kafka-server-start.bat @@ -24,7 +24,15 @@ IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] ( set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties ) IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( -set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G +rem detect OS architecture +wmic os get osarchitecture | find /i "32-bit" >nul 2>&1 +IF NOT ERRORLEVEL 1 ( +rem 32-bit OS +set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M +) ELSE ( +rem 64-bit OS +set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G +) ) %~dp0kafka-run-class.bat kafka.Kafka %* EndLocal
kafka git commit: KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat <gharatmayures...@gmail.com> and Sumant Tambe <suta...@yahoo.com>
Repository: kafka Updated Branches: refs/heads/trunk 1503f7603 -> 497e669dd KAFKA-1911; Async delete topic - contributed by Mayuresh Gharatand Sumant Tambe The last patch submitted by MayureshGharat (back in Dec 15) has been rebased to the latest trunk. I took care of a couple of test failures (MetricsTest) along the way. jjkoshy , granders , avianey , you may be interested in this PR. Author: Sumant Tambe Author: Mayuresh Gharat Author: MayureshGharat Reviewers: Joel Koshy Closes #1664 from sutambe/async-delete-topic Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/497e669d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/497e669d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/497e669d Branch: refs/heads/trunk Commit: 497e669dd806fc984f5dceaede4a1f40f4e77c48 Parents: 1503f76 Author: Mayuresh Gharat Authored: Wed Nov 30 10:40:31 2016 -0800 Committer: Joel Koshy Committed: Wed Nov 30 10:40:31 2016 -0800 -- .../main/scala/kafka/cluster/Partition.scala| 5 +- .../main/scala/kafka/log/AbstractIndex.scala| 23 ++--- core/src/main/scala/kafka/log/Log.scala | 31 --- core/src/main/scala/kafka/log/LogManager.scala | 91 .../scala/kafka/server/ReplicaManager.scala | 17 ++-- 5 files changed, 114 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/cluster/Partition.scala -- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 4d3fb56..44d6a77 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -144,12 +144,13 @@ class Partition(val topic: String, assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None + val topicPartition = TopicAndPartition(topic, partitionId) try { -logManager.deleteLog(TopicAndPartition(topic, partitionId)) +logManager.asyncDelete(topicPartition) removePartitionMetrics() } catch { case e: IOException => - fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e) + fatal(s"Error deleting the log for partition $topicPartition", e) Runtime.getRuntime().halt(1) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/AbstractIndex.scala -- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index d594f18..77ef0f7 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -33,11 +33,11 @@ import scala.math.ceil /** * The abstract index class which holds entry format agnostic methods. * - * @param _file The index file + * @param file The index file * @param baseOffset the base offset of the segment that this index is corresponding to. * @param maxIndexSize The maximum index size in bytes. */ -abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) +abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { protected def entrySize: Int @@ -46,8 +46,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val @volatile protected var mmap: MappedByteBuffer = { -val newlyCreated = _file.createNewFile() -val raf = new RandomAccessFile(_file, "rw") +val newlyCreated = file.createNewFile() +val raf = new RandomAccessFile(file, "rw") try { /* pre-allocate the file if necessary */ if(newlyCreated) { @@ -92,11 +92,6 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val def entries: Int = _entries /** - * The index file - */ - def file: File = _file - - /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at * loading segments from disk or truncating back to an old segment where a new log segment became active; @@ -104,7 +99,7 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val */ def