kafka git commit: KAFKA-5700; Producer should not drop header information when splitting batches

2017-08-06 Thread jqin
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 9b327fb79 -> 557001f9f


KAFKA-5700; Producer should not drop header information when splitting batches

Producer should not drop header information when splitting batches.  This PR 
also corrects a minor typo in Sender.java, where `spitting and retrying` should 
be `splitting and retrying`.

Author: huxihx 

Reviewers: Ismael Juma , Jiangjie Qin 

Closes #3620 from huxihx/KAFKA-5700

(cherry picked from commit 1cd86284e808e2846e94b312bb55141f6d216d51)
Signed-off-by: Jiangjie Qin 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/557001f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/557001f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/557001f9

Branch: refs/heads/0.11.0
Commit: 557001f9f91bdb7679e889e3e62eeb3bac7f94c7
Parents: 9b327fb
Author: huxihx 
Authored: Sun Aug 6 22:25:52 2017 -0700
Committer: Jiangjie Qin 
Committed: Sun Aug 6 22:28:06 2017 -0700

--
 .../producer/internals/ProducerBatch.java   |  2 +-
 .../clients/producer/internals/Sender.java  |  2 +-
 .../producer/internals/ProducerBatchTest.java   | 40 
 3 files changed, 42 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/557001f9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index fcdda8d..c6c3e90 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -128,7 +128,7 @@ public final class ProducerBatch {
 return false;
 } else {
 // No need to get the CRC.
-this.recordsBuilder.append(timestamp, key, value);
+this.recordsBuilder.append(timestamp, key, value, headers);
 this.maxRecordSize = Math.max(this.maxRecordSize, 
AbstractRecords.estimateSizeInBytesUpperBound(magic(),
 recordsBuilder.compressionType(), key, value, headers));
 FutureRecordMetadata future = new 
FutureRecordMetadata(this.produceFuture, this.recordCount,

http://git-wip-us.apache.org/repos/asf/kafka/blob/557001f9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 38d8840..59c513b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -498,7 +498,7 @@ public class Sender implements Runnable {
 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || 
batch.isCompressed())) {
 // If the batch is too large, we split the batch and send the 
split batches again. We do not decrement
 // the retry attempts in this case.
-log.warn("Got error produce response in correlation id {} on 
topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
+log.warn("Got error produce response in correlation id {} on 
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
  correlationId,
  batch.topicPartition,
  this.retries - batch.attempts(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/557001f9/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
--
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 2c7e4f9..41aa5c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import 

kafka git commit: KAFKA-5700; Producer should not drop header information when splitting batches

2017-08-06 Thread jqin
Repository: kafka
Updated Branches:
  refs/heads/trunk 1d291a421 -> 1cd86284e


KAFKA-5700; Producer should not drop header information when splitting batches

Producer should not drop header information when splitting batches.  This PR 
also corrects a minor typo in Sender.java, where `spitting and retrying` should 
be `splitting and retrying`.

Author: huxihx 

Reviewers: Ismael Juma , Jiangjie Qin 

Closes #3620 from huxihx/KAFKA-5700


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd86284
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd86284
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd86284

Branch: refs/heads/trunk
Commit: 1cd86284e808e2846e94b312bb55141f6d216d51
Parents: 1d291a4
Author: huxihx 
Authored: Sun Aug 6 22:25:52 2017 -0700
Committer: Jiangjie Qin 
Committed: Sun Aug 6 22:25:52 2017 -0700

--
 .../producer/internals/ProducerBatch.java   |  2 +-
 .../clients/producer/internals/Sender.java  |  2 +-
 .../producer/internals/ProducerBatchTest.java   | 40 
 3 files changed, 42 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 53563ba..ee7d21a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -128,7 +128,7 @@ public final class ProducerBatch {
 return false;
 } else {
 // No need to get the CRC.
-this.recordsBuilder.append(timestamp, key, value);
+this.recordsBuilder.append(timestamp, key, value, headers);
 this.maxRecordSize = Math.max(this.maxRecordSize, 
AbstractRecords.estimateSizeInBytesUpperBound(magic(),
 recordsBuilder.compressionType(), key, value, headers));
 FutureRecordMetadata future = new 
FutureRecordMetadata(this.produceFuture, this.recordCount,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e6d8bc5..8519c4a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -498,7 +498,7 @@ public class Sender implements Runnable {
 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || 
batch.isCompressed())) {
 // If the batch is too large, we split the batch and send the 
split batches again. We do not decrement
 // the retry attempts in this case.
-log.warn("Got error produce response in correlation id {} on 
topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
+log.warn("Got error produce response in correlation id {} on 
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
  correlationId,
  batch.topicPartition,
  this.retries - batch.attempts(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
--
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 2c7e4f9..41aa5c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.CompressionType;
 import