kafka git commit: KAFKA-5700; Producer should not drop header information when splitting batches
Repository: kafka Updated Branches: refs/heads/0.11.0 9b327fb79 -> 557001f9f KAFKA-5700; Producer should not drop header information when splitting batches Producer should not drop header information when splitting batches. This PR also corrects a minor typo in Sender.java, where `spitting and retrying` should be `splitting and retrying`. Author: huxihx Reviewers: Ismael Juma , Jiangjie Qin Closes #3620 from huxihx/KAFKA-5700 (cherry picked from commit 1cd86284e808e2846e94b312bb55141f6d216d51) Signed-off-by: Jiangjie Qin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/557001f9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/557001f9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/557001f9 Branch: refs/heads/0.11.0 Commit: 557001f9f91bdb7679e889e3e62eeb3bac7f94c7 Parents: 9b327fb Author: huxihx Authored: Sun Aug 6 22:25:52 2017 -0700 Committer: Jiangjie Qin Committed: Sun Aug 6 22:28:06 2017 -0700 -- .../producer/internals/ProducerBatch.java | 2 +- .../clients/producer/internals/Sender.java | 2 +- .../producer/internals/ProducerBatchTest.java | 40 3 files changed, 42 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/557001f9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index fcdda8d..c6c3e90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -128,7 +128,7 @@ public final class ProducerBatch { return false; } else { // No need to get the CRC. -this.recordsBuilder.append(timestamp, key, value); +this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, http://git-wip-us.apache.org/repos/asf/kafka/blob/557001f9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 38d8840..59c513b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -498,7 +498,7 @@ public class Sender implements Runnable { (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // If the batch is too large, we split the batch and send the split batches again. We do not decrement // the retry attempts in this case. -log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}", +log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts(), http://git-wip-us.apache.org/repos/asf/kafka/blob/557001f9/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java -- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 2c7e4f9..41aa5c6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.re
kafka git commit: KAFKA-5700; Producer should not drop header information when splitting batches
Repository: kafka Updated Branches: refs/heads/trunk 1d291a421 -> 1cd86284e KAFKA-5700; Producer should not drop header information when splitting batches Producer should not drop header information when splitting batches. This PR also corrects a minor typo in Sender.java, where `spitting and retrying` should be `splitting and retrying`. Author: huxihx Reviewers: Ismael Juma , Jiangjie Qin Closes #3620 from huxihx/KAFKA-5700 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd86284 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd86284 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd86284 Branch: refs/heads/trunk Commit: 1cd86284e808e2846e94b312bb55141f6d216d51 Parents: 1d291a4 Author: huxihx Authored: Sun Aug 6 22:25:52 2017 -0700 Committer: Jiangjie Qin Committed: Sun Aug 6 22:25:52 2017 -0700 -- .../producer/internals/ProducerBatch.java | 2 +- .../clients/producer/internals/Sender.java | 2 +- .../producer/internals/ProducerBatchTest.java | 40 3 files changed, 42 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 53563ba..ee7d21a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -128,7 +128,7 @@ public final class ProducerBatch { return false; } else { // No need to get the CRC. -this.recordsBuilder.append(timestamp, key, value); +this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e6d8bc5..8519c4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -498,7 +498,7 @@ public class Sender implements Runnable { (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // If the batch is too large, we split the batch and send the split batches again. We do not decrement // the retry attempts in this case. -log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}", +log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts(), http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java -- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 2c7e4f9..41aa5c6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LegacyRecord; import org.apache.kafka.common.record.MemoryRecords; @@ -31,6 +33,7 @@ import org
kafka git commit: KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests
Repository: kafka Updated Branches: refs/heads/0.11.0 b4f2e7289 -> 9b327fb79 KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging) Author: Paolo Patierno Reviewers: Ewen Cheslack-Postava Closes #3578 from ppatierno/kafka-5643 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b327fb7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b327fb7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b327fb7 Branch: refs/heads/0.11.0 Commit: 9b327fb79ed00a88037dd67206e9f6c091dc8022 Parents: b4f2e72 Author: Paolo Patierno Authored: Sun Aug 6 21:50:43 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Sun Aug 6 21:52:03 2017 -0700 -- tests/docker/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/9b327fb7/tests/docker/run_tests.sh -- diff --git a/tests/docker/run_tests.sh b/tests/docker/run_tests.sh index 329b556..6633e33 100755 --- a/tests/docker/run_tests.sh +++ b/tests/docker/run_tests.sh @@ -27,4 +27,4 @@ die() { if ${SCRIPT_DIR}/ducker-ak ssh | grep -q '(none)'; then ${SCRIPT_DIR}/ducker-ak up -n "${KAFKA_NUM_CONTAINERS}" || die "ducker-ak up failed" fi -${SCRIPT_DIR}/ducker-ak test ${TC_PATHS} || die "ducker-ak test failed" +${SCRIPT_DIR}/ducker-ak test ${TC_PATHS} ${_DUCKTAPE_OPTIONS} || die "ducker-ak test failed"
kafka git commit: KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests
Repository: kafka Updated Branches: refs/heads/0.10.2 0b6e38a2c -> 991ef411d KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging) Author: Paolo Patierno Reviewers: Ewen Cheslack-Postava Closes #3578 from ppatierno/kafka-5643 (cherry picked from commit 1d291a4219656cd9f597432ff9d7f8a42900abdb) Signed-off-by: Ewen Cheslack-Postava Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/991ef411 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/991ef411 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/991ef411 Branch: refs/heads/0.10.2 Commit: 991ef411de5a85ddaa95c57400cd752bf6ec85f2 Parents: 0b6e38a Author: Paolo Patierno Authored: Sun Aug 6 21:50:43 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Sun Aug 6 21:50:54 2017 -0700 -- tests/docker/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/991ef411/tests/docker/run_tests.sh -- diff --git a/tests/docker/run_tests.sh b/tests/docker/run_tests.sh index 329b556..6633e33 100755 --- a/tests/docker/run_tests.sh +++ b/tests/docker/run_tests.sh @@ -27,4 +27,4 @@ die() { if ${SCRIPT_DIR}/ducker-ak ssh | grep -q '(none)'; then ${SCRIPT_DIR}/ducker-ak up -n "${KAFKA_NUM_CONTAINERS}" || die "ducker-ak up failed" fi -${SCRIPT_DIR}/ducker-ak test ${TC_PATHS} || die "ducker-ak test failed" +${SCRIPT_DIR}/ducker-ak test ${TC_PATHS} ${_DUCKTAPE_OPTIONS} || die "ducker-ak test failed"
kafka git commit: KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests
Repository: kafka Updated Branches: refs/heads/trunk d637c134c -> 1d291a421 KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging) Author: Paolo Patierno Reviewers: Ewen Cheslack-Postava Closes #3578 from ppatierno/kafka-5643 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d291a42 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d291a42 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d291a42 Branch: refs/heads/trunk Commit: 1d291a4219656cd9f597432ff9d7f8a42900abdb Parents: d637c13 Author: Paolo Patierno Authored: Sun Aug 6 21:50:43 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Sun Aug 6 21:50:43 2017 -0700 -- tests/docker/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d291a42/tests/docker/run_tests.sh -- diff --git a/tests/docker/run_tests.sh b/tests/docker/run_tests.sh index 329b556..6633e33 100755 --- a/tests/docker/run_tests.sh +++ b/tests/docker/run_tests.sh @@ -27,4 +27,4 @@ die() { if ${SCRIPT_DIR}/ducker-ak ssh | grep -q '(none)'; then ${SCRIPT_DIR}/ducker-ak up -n "${KAFKA_NUM_CONTAINERS}" || die "ducker-ak up failed" fi -${SCRIPT_DIR}/ducker-ak test ${TC_PATHS} || die "ducker-ak test failed" +${SCRIPT_DIR}/ducker-ak test ${TC_PATHS} ${_DUCKTAPE_OPTIONS} || die "ducker-ak test failed"