kafka git commit: MINOR: Do not collect zk persistent data by default
Repository: kafka Updated Branches: refs/heads/trunk 1cd22ed33 -> df88d3be7 MINOR: Do not collect zk persistent data by default In system tests zookeeper service, it is overkill and space-intensive to collect zookeeper data logs by default. This minor patch turns off default collection. Author: Geoff AndersonReviewers: Jun Rao Closes #504 from granders/minor-zk-change-log-collect Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df88d3be Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df88d3be Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df88d3be Branch: refs/heads/trunk Commit: df88d3be75396b48a762149af2f4bbcd60fe69b9 Parents: 1cd22ed Author: Geoff Anderson Authored: Wed Nov 11 18:45:24 2015 -0800 Committer: Jun Rao Committed: Wed Nov 11 18:45:24 2015 -0800 -- tests/kafkatest/services/zookeeper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/df88d3be/tests/kafkatest/services/zookeeper.py -- diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index a1f999e..cae4268 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -30,7 +30,7 @@ class ZookeeperService(Service): "collect_default": True}, "zk_data": { "path": "/mnt/zookeeper", -"collect_default": True} +"collect_default": False} } def __init__(self, context, num_nodes):
kafka git commit: KAFKA-2813; selector doesn't close socket connection on non-IOExceptions
Repository: kafka Updated Branches: refs/heads/trunk df88d3be7 -> 3fd168d95 KAFKA-2813; selector doesn't close socket connection on non-IOExceptions Patched Selector.poll() to close the connection on any exception. Author: Jun RaoReviewers: Guozhang Wang , Gwen Shapira Closes #501 from junrao/KAFKA-2813 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3fd168d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3fd168d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3fd168d9 Branch: refs/heads/trunk Commit: 3fd168d9522d1ad25f5582fbe838cea15bdb525f Parents: df88d3b Author: Jun Rao Authored: Wed Nov 11 22:18:19 2015 -0800 Committer: Jun Rao Committed: Wed Nov 11 22:18:19 2015 -0800 -- .../apache/kafka/common/network/Selector.java | 20 +++- 1 file changed, 7 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/3fd168d9/clients/src/main/java/org/apache/kafka/common/network/Selector.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 34de616..639a2be 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -241,7 +241,6 @@ public class Selector implements Selectable { * @throws IllegalArgumentException If `timeout` is negative * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send - * @throws InvalidReceiveException If invalid data is received */ @Override public void poll(long timeout) throws IOException { @@ -284,16 +283,8 @@ public class Selector implements Selectable { /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; -try { -while ((networkReceive = channel.read()) != null) { -addToStagedReceives(channel, networkReceive); -} -} catch (InvalidReceiveException e) { -log.error("Invalid data received from " + channel.id() + " closing connection", e); -close(channel); -this.disconnected.add(channel.id()); -throw e; -} +while ((networkReceive = channel.read()) != null) +addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ @@ -310,9 +301,12 @@ public class Selector implements Selectable { close(channel); this.disconnected.add(channel.id()); } -} catch (IOException e) { +} catch (Exception e) { String desc = channel.socketDescription(); -log.debug("Connection with {} disconnected", desc, e); +if (e instanceof IOException) +log.debug("Connection with {} disconnected", desc, e); +else +log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); }
kafka git commit: KAFKA-2813; selector doesn't close socket connection on non-IOExceptions
Repository: kafka Updated Branches: refs/heads/0.9.0 555fab913 -> 8c370093d KAFKA-2813; selector doesn't close socket connection on non-IOExceptions Patched Selector.poll() to close the connection on any exception. Author: Jun RaoReviewers: Guozhang Wang , Gwen Shapira Closes #501 from junrao/KAFKA-2813 (cherry picked from commit 3fd168d9522d1ad25f5582fbe838cea15bdb525f) Signed-off-by: Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c370093 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c370093 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c370093 Branch: refs/heads/0.9.0 Commit: 8c370093d1ee3b9ba97f85970ec30fbb26ff48ef Parents: 555fab9 Author: Jun Rao Authored: Wed Nov 11 22:18:19 2015 -0800 Committer: Jun Rao Committed: Wed Nov 11 22:18:32 2015 -0800 -- .../apache/kafka/common/network/Selector.java | 20 +++- 1 file changed, 7 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/8c370093/clients/src/main/java/org/apache/kafka/common/network/Selector.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 34de616..639a2be 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -241,7 +241,6 @@ public class Selector implements Selectable { * @throws IllegalArgumentException If `timeout` is negative * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send - * @throws InvalidReceiveException If invalid data is received */ @Override public void poll(long timeout) throws IOException { @@ -284,16 +283,8 @@ public class Selector implements Selectable { /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; -try { -while ((networkReceive = channel.read()) != null) { -addToStagedReceives(channel, networkReceive); -} -} catch (InvalidReceiveException e) { -log.error("Invalid data received from " + channel.id() + " closing connection", e); -close(channel); -this.disconnected.add(channel.id()); -throw e; -} +while ((networkReceive = channel.read()) != null) +addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ @@ -310,9 +301,12 @@ public class Selector implements Selectable { close(channel); this.disconnected.add(channel.id()); } -} catch (IOException e) { +} catch (Exception e) { String desc = channel.socketDescription(); -log.debug("Connection with {} disconnected", desc, e); +if (e instanceof IOException) +log.debug("Connection with {} disconnected", desc, e); +else +log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); }
kafka git commit: KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone
Repository: kafka Updated Branches: refs/heads/0.9.0 1828b8e52 -> 39ea9d3b7 KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone Removed the check for expiring only those batches whose metadata is unavailable. Now the batches will be expired irrespective of whether the leader is available or not, as soon as it reaches the requestimeout threshold. Author: Mayuresh GharatReviewers: Jun Rao Closes #503 from MayureshGharat/kafka-2805 (cherry picked from commit 1cd22ed33f1e1f63d8cb63f68309e5d8f43be1e1) Signed-off-by: Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39ea9d3b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39ea9d3b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39ea9d3b Branch: refs/heads/0.9.0 Commit: 39ea9d3b70341d58dfc9c9fa837229046d483868 Parents: 1828b8e Author: Mayuresh Gharat Authored: Wed Nov 11 18:41:45 2015 -0800 Committer: Jun Rao Committed: Wed Nov 11 18:41:57 2015 -0800 -- .../producer/internals/RecordAccumulator.java | 21 +--- 1 file changed, 9 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/39ea9d3b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index db61121..d4a8a23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -216,18 +216,15 @@ public final class RecordAccumulator { Iterator batchIterator = dq.iterator(); while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); -Node leader = cluster.leaderFor(topicAndPartition); -if (leader == null) { -// check if the batch is expired -if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { -expiredBatches.add(batch); -count++; -batchIterator.remove(); -deallocate(batch); -} else { -if (!batch.inRetry()) { -break; -} +// check if the batch is expired +if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { +expiredBatches.add(batch); +count++; +batchIterator.remove(); +deallocate(batch); +} else { +if (!batch.inRetry()) { +break; } } }
kafka git commit: KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone
Repository: kafka Updated Branches: refs/heads/trunk 124f73b17 -> 1cd22ed33 KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone Removed the check for expiring only those batches whose metadata is unavailable. Now the batches will be expired irrespective of whether the leader is available or not, as soon as it reaches the requestimeout threshold. Author: Mayuresh GharatReviewers: Jun Rao Closes #503 from MayureshGharat/kafka-2805 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd22ed3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd22ed3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd22ed3 Branch: refs/heads/trunk Commit: 1cd22ed33f1e1f63d8cb63f68309e5d8f43be1e1 Parents: 124f73b Author: Mayuresh Gharat Authored: Wed Nov 11 18:41:45 2015 -0800 Committer: Jun Rao Committed: Wed Nov 11 18:41:45 2015 -0800 -- .../producer/internals/RecordAccumulator.java | 21 +--- 1 file changed, 9 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd22ed3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index db61121..d4a8a23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -216,18 +216,15 @@ public final class RecordAccumulator { Iterator batchIterator = dq.iterator(); while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); -Node leader = cluster.leaderFor(topicAndPartition); -if (leader == null) { -// check if the batch is expired -if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { -expiredBatches.add(batch); -count++; -batchIterator.remove(); -deallocate(batch); -} else { -if (!batch.inRetry()) { -break; -} +// check if the batch is expired +if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { +expiredBatches.add(batch); +count++; +batchIterator.remove(); +deallocate(batch); +} else { +if (!batch.inRetry()) { +break; } } }
kafka git commit: KAFKA-2807: Move ThroughputThrottler back to tools jar to fix upgrade tests.
Repository: kafka Updated Branches: refs/heads/trunk a8ccdc615 -> c6b8de4e6 KAFKA-2807: Move ThroughputThrottler back to tools jar to fix upgrade tests. Author: Ewen Cheslack-PostavaReviewers: Gwen Shapira Closes #499 from ewencp/kafka-2807-relocate-throughput-throttler Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6b8de4e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6b8de4e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6b8de4e Branch: refs/heads/trunk Commit: c6b8de4e6806d8f9f4af57e15f2a7f4170265c42 Parents: a8ccdc6 Author: Ewen Cheslack-Postava Authored: Wed Nov 11 15:55:12 2015 -0800 Committer: Gwen Shapira Committed: Wed Nov 11 15:55:12 2015 -0800 -- build.gradle| 45 +++--- .../kafka/common/utils/ThroughputThrottler.java | 141 --- .../connect/tools/VerifiableSourceTask.java | 2 +- settings.gradle | 2 +- .../apache/kafka/tools/ProducerPerformance.java | 1 - .../apache/kafka/tools/ThroughputThrottler.java | 141 +++ .../apache/kafka/tools/VerifiableProducer.java | 1 - 7 files changed, 166 insertions(+), 167 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/build.gradle -- diff --git a/build.gradle b/build.gradle index 70fdbcd..0ee6c41 100644 --- a/build.gradle +++ b/build.gradle @@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'] +def connectPkgs = ['connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools'] def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} @@ -321,7 +321,7 @@ project(':core') { standardOutput = new File('docs/kafka_config.html').newOutputStream() } - task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect:runtime:genConnectConfigDocs'], type: Tar) { + task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect-runtime:genConnectConfigDocs'], type: Tar) { classifier = 'site-docs' compression = Compression.GZIP from project.file("../docs") @@ -342,16 +342,16 @@ project(':core') { from(project.siteDocsTar) { into("site-docs/") } from(project(':tools').jar) { into("libs/") } from(project(':tools').configurations.runtime) { into("libs/") } -from(project(':connect:api').jar) { into("libs/") } -from(project(':connect:api').configurations.runtime) { into("libs/") } -from(project(':connect:runtime').jar) { into("libs/") } -from(project(':connect:runtime').configurations.runtime) { into("libs/") } -from(project(':connect:json').jar) { into("libs/") } -from(project(':connect:json').configurations.runtime) { into("libs/") } -from(project(':connect:file').jar) { into("libs/") } -from(project(':connect:file').configurations.runtime) { into("libs/") } -from(project(':connect:tools').jar) { into("libs/") } -from(project(':connect:tools').configurations.runtime) { into("libs/") } +from(project(':connect-api').jar) { into("libs/") } +from(project(':connect-api').configurations.runtime) { into("libs/") } +from(project(':connect-runtime').jar) { into("libs/") } +from(project(':connect-runtime').configurations.runtime) { into("libs/") } +from(project(':connect-json').jar) { into("libs/") } +from(project(':connect-json').configurations.runtime) { into("libs/") } +from(project(':connect-file').jar) { into("libs/") } +from(project(':connect-file').configurations.runtime) { into("libs/") } +from(project(':connect-tools').jar) { into("libs/") } +from(project(':connect-tools').configurations.runtime) { into("libs/") } } jar { @@ -638,7 +638,7 @@ project(':log4j-appender') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':connect:api') { +project(':connect-api') { apply plugin: 'checkstyle' archivesBaseName = "connect-api" @@ -695,12 +695,12 @@ project(':connect:api') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':connect:json') { +project(':connect-json') { apply plugin: 'checkstyle' archivesBaseName = "connect-json" dependencies { -compile project(':connect:api') +compile project(':connect-api') compile "$slf4japi" compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" @@ -756,12 +756,12 @@
kafka git commit: KAFKA-2790: doc improvements
Repository: kafka Updated Branches: refs/heads/0.9.0 0ea17e959 -> 1828b8e52 KAFKA-2790: doc improvements Author: Gwen ShapiraReviewers: Jun Rao, Guozhang Wang Closes #491 from gwenshap/KAFKA-2790 (cherry picked from commit a8ccdc6154a1e10982cb80df82e8661903eb9ae5) Signed-off-by: Gwen Shapira Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1828b8e5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1828b8e5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1828b8e5 Branch: refs/heads/0.9.0 Commit: 1828b8e52dd83a757580964d03b0148f19ef11fe Parents: 0ea17e9 Author: Gwen Shapira Authored: Wed Nov 11 10:54:09 2015 -0800 Committer: Gwen Shapira Committed: Wed Nov 11 10:54:26 2015 -0800 -- .../apache/kafka/common/config/ConfigDef.java | 12 +- docs/api.html | 8 +- docs/documentation.html | 6 + docs/ops.html | 6 +- docs/quickstart.html| 6 +- docs/security.html | 133 ++- docs/upgrade.html | 3 +- 7 files changed, 156 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1828b8e5/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 2a5ebee..13fb829 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -384,14 +384,14 @@ public class ConfigDef { } }); StringBuilder b = new StringBuilder(); -b.append("\n"); +b.append("\n"); b.append("\n"); b.append("Name\n"); +b.append("Description\n"); b.append("Type\n"); b.append("Default\n"); b.append("Valid Values\n"); b.append("Importance\n"); -b.append("Description\n"); b.append("\n"); for (ConfigKey def : configs) { b.append("\n"); @@ -399,6 +399,9 @@ public class ConfigDef { b.append(def.name); b.append(""); b.append(""); +b.append(def.documentation); +b.append(""); +b.append(""); b.append(def.type.toString().toLowerCase()); b.append(""); b.append(""); @@ -418,12 +421,9 @@ public class ConfigDef { b.append(""); b.append(def.importance.toString().toLowerCase()); b.append(""); -b.append(""); -b.append(def.documentation); -b.append(""); b.append("\n"); } -b.append(""); +b.append(""); return b.toString(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1828b8e5/docs/api.html -- diff --git a/docs/api.html b/docs/api.html index 835bdf2..3aad872 100644 --- a/docs/api.html +++ b/docs/api.html @@ -15,21 +15,21 @@ limitations under the License. --> -We are in the process of rewritting the JVM clients for Kafka. As of 0.8.2 Kafka includes a newly rewritten Java producer. The next release will include an equivalent Java consumer. These new clients are meant to supplant the existing Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server. +Apache Kafka includes new java clients (in the org.apache.kafka.clients package). These are meant to supplant the older Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server. 2.1 Producer API -As of the 0.8.2 release we encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases): +We encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client. You can use this client by adding a dependency on the client jar