[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-09-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=140678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140678
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 03/Sep/18 18:13
Start Date: 03/Sep/18 18:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #5287: [BEAM-4038] 
Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287#issuecomment-418172023
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140678)
Time Spent: 10h  (was: 9h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-09-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=140687=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140687
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 03/Sep/18 18:13
Start Date: 03/Sep/18 18:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] closed pull request #5287: [BEAM-4038] 
Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 0859233aad1..5ed9a04074e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1013,6 +1013,10 @@ private KafkaIO() {}
   );
 }
 
+public PTransform>, PDone> writeRecords() 
{
+  return new ProducerRecordWrite<>(this);
+}
+
 @Override
 public PDone expand(PCollection> input) {
   checkArgument(
@@ -1130,6 +1134,20 @@ public T decode(InputStream inStream) {
 }
   }
 
+  private static class ProducerRecordWrite extends
+  PTransform>, PDone> {
+
+private final Write kvWriteTransform;
+
+ProducerRecordWrite(Write kvWriteTransform){
+  this.kvWriteTransform = kvWriteTransform;
+}
+
+@Override public PDone expand(PCollection> input) {
+  input.apply(ParDo.of(new ProducerRecordWriter<>(kvWriteTransform)));
+  return PDone.in(input.getPipeline());
+}
+  }
 
   /**
* Attempt to infer a {@link Coder} by extracting the type of the 
deserialized-class from the
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
new file mode 100644
index 000..5424e4278ca
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
@@ -0,0 +1,122 @@
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+
+/** */
+public class ProducerRecordCoder extends 
StructuredCoder> {
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+  private static final IterableCoder headerCoder =
+  IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of()));
+
+  private final KvCoder kvCoder;
+
+  public static  ProducerRecordCoder of(Coder keyCoder, 
Coder valueCoder) {
+return new ProducerRecordCoder<>(keyCoder, valueCoder);
+  }
+
+  public ProducerRecordCoder(Coder keyCoder, Coder valueCoder) {
+this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+  }
+
+  @Override
+  public void encode(ProducerRecord value, OutputStream outStream) 
throws IOException {
+stringCoder.encode(value.topic(), outStream);
+intCoder.encode(value.partition(), outStream);
+longCoder.encode(value.timestamp(), outStream);
+headerCoder.encode(toIterable(value), outStream);
+kvCoder.encode(KV.of(value.key(), value.value()), outStream);
+  }
+
+  @Override
+  public ProducerRecord decode(InputStream inStream) throws IOException {
+String topic = stringCoder.decode(inStream);
+Integer partition = intCoder.decode(inStream);
+Long timestamp = longCoder.decode(inStream);
+Headers headers = (Headers) toHeaders(headerCoder.decode(inStream));
+KV kv = kvCoder.decode(inStream);
+if (ConsumerSpEL.hasHeaders) {
+  return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), 
kv.getValue(), headers);
+}
+return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), 
kv.getValue());
+  }
+
+  

[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-08-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=138484=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138484
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 27/Aug/18 17:21
Start Date: 27/Aug/18 17:21
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #5287: [BEAM-4038] 
Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287#issuecomment-416300484
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 138484)
Time Spent: 9h 50m  (was: 9h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=116872=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116872
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 28/Jun/18 16:25
Start Date: 28/Jun/18 16:25
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5287: [BEAM-4038] 
Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287#issuecomment-401093308
 
 
   We have turned on autoformatting of the codebase, which causes small 
conflicts across the board. You can probably safely rebase and just keep your 
changes. Like this:
   
   ```
   $ git rebase
   ... see some conflicts
   $ git diff
   ... confirmed that the conflicts are just autoformatting
   ... so we can just keep our changes are do our own autoformat
   $ git checkout --theirs --
   $ git add -u
   $ git rebase --continue
   $ ./gradlew spotlessJavaApply
   ```
   
   Please ping me if you run into any difficulty. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116872)
Time Spent: 9h 40m  (was: 9.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
> Fix For: 2.6.0
>
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-05-07 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=99104=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99104
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 07/May/18 18:33
Start Date: 07/May/18 18:33
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5287: 
[BEAM-4038] Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287#discussion_r186504911
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -1130,6 +1134,20 @@ public T decode(InputStream inStream) {
 }
   }
 
+  private static class ProducerRecordWrite extends
+  PTransform>, PDone> {
+
+private final Write kvWriteTransform;
+
+ProducerRecordWrite(Write kvWriteTransform){
+  this.kvWriteTransform = kvWriteTransform;
+}
+
+@Override public PDone expand(PCollection> input) {
+  input.apply(ParDo.of(new ProducerRecordWriter<>(kvWriteTransform)));
 
 Review comment:
   The main issue with this approach is that `kvWriteTransform`'s `expand()` is 
not invoked. None of the validation checks are performed. I am thinking too how 
best to organize this.
   
   One option is to have the main `Write` transform primarily work with 
ProducerRecord. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 99104)
Time Spent: 9.5h  (was: 9h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-05-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=98761=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98761
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 06/May/18 02:03
Start Date: 06/May/18 02:03
Worklog Time Spent: 10m 
  Work Description: gkumar7 opened a new pull request #5287: [BEAM-4038] 
Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287
 
 
   Add support to write ```ProducerRecords``` to Kafka, works for previous 
Kafka versions as well. @rangadi, I thought it would be beneficial to share 
these initial changes. If the approach seems good, I will refactor and add test 
cases. Following are the additional changes which need to be done:
   
   - [ ] Remove code duplication in  ```ProducerRecordCoder``` and 
```ProducerRecordWriter```.
   - [ ] Add checks for timestamp. 
 - this may not be necessary as Kafka versions before 0.10.1.0 are 
deprecated and timestamps were already added in this version 
[(KAFKA-2511)](https://issues.apache.org/jira/browse/KAFKA-2511). On the other 
hand, if this is required, as part of these changes, I would also like to 
remove ```KafkaTimestampType```.
   - [ ] Make similar changes to ```KafkaExactlyOnceSink``` to write 
```ProducerRecords```
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98761)
Time Spent: 9h 20m  (was: 9h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=94250=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94250
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 23/Apr/18 19:11
Start Date: 23/Apr/18 19:11
Worklog Time Spent: 10m 
  Work Description: iemejia closed pull request #5111: [BEAM-4038] Support 
Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index 493598672d9..7ddfaa06a4d 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -42,6 +42,7 @@
  * to eliminate the method definition differences.
  */
 class ConsumerSpEL {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerSpEL.class);
 
   private SpelParserConfiguration config = new SpelParserConfiguration(true, 
true);
@@ -55,6 +56,21 @@
 
   private boolean hasRecordTimestamp = false;
   private boolean hasOffsetsForTimes = false;
+  static boolean hasHeaders = false;
+
+  static {
+try {
+  // It is supported by Kafka Client 0.11.0.0 onwards.
+  hasHeaders = ConsumerRecord
+  .class
+  .getMethod("headers", (Class[]) null)
+  .getReturnType()
+  .getName()
+  .equals("org.apache.kafka.common.header.Headers");
+} catch (NoSuchMethodException | SecurityException e) {
+  LOG.debug("Headers is not available");
+}
+  }
 
   public ConsumerSpEL() {
 try {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
index f4b1f1bb67b..25cc27bee4a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import com.google.common.base.Objects;
 import java.io.Serializable;
 import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.common.header.Headers;
 
 /**
  * KafkaRecord contains key and value of the record as well as metadata for 
the record (topic name,
@@ -33,6 +36,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
   private final KV kv;
   private final long timestamp;
   private final KafkaTimestampType timestampType;
@@ -43,9 +47,10 @@ public KafkaRecord(
   long offset,
   long timestamp,
   KafkaTimestampType timestampType,
+  @Nullable Headers headers,
   K key,
   V value) {
-this(topic, partition, offset, timestamp, timestampType, KV.of(key, 
value));
+this(topic, partition, offset, timestamp, timestampType, headers, 
KV.of(key, value));
   }
 
   public KafkaRecord(
@@ -54,16 +59,17 @@ public KafkaRecord(
   long offset,
   long timestamp,
   KafkaTimestampType timestampType,
+  @Nullable Headers headers,
   KV kv) {
 this.topic = topic;
 this.partition = partition;
 this.offset = offset;
 this.timestamp = timestamp;
 this.timestampType = timestampType;
+this.headers = headers;
 this.kv = kv;
   }
 
-
   public String getTopic() {
 return topic;
   }
@@ -76,6 +82,14 @@ public long getOffset() {
 return offset;
   }
 
+  public Headers getHeaders() {
+if (!ConsumerSpEL.hasHeaders){
+  throw new RuntimeException("The version kafka-clients does not support 
record headers, "
+  + "please use version 0.11.0.0 or newer");
+}
+return headers;
+  }
+
   public KV getKV() {
 return kv;
   }
@@ -90,7 +104,7 @@ public KafkaTimestampType getTimestampType() {
 
   @Override
   public int hashCode() {
-return Arrays.deepHashCode(new Object[]{topic, partition, offset, 
timestamp, kv});
+return Arrays.deepHashCode(new Object[] {topic, partition, offset, 
timestamp, headers, kv});
   }
 
   @Override
@@ -102,6 +116,7 @@ public boolean equals(Object obj) {
   && partition == other.partition
   && offset == other.offset
   && timestamp == other.timestamp
+  && Objects.equal(headers, other.headers)
   && kv.equals(other.kv);
 } else {
   return false;
diff --git 

[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=94203=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94203
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 23/Apr/18 17:48
Start Date: 23/Apr/18 17:48
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on issue #5111: [BEAM-4038] Support 
Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#issuecomment-383663106
 
 
   Yup


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 94203)
Time Spent: 9h  (was: 8h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=94166=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94166
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 23/Apr/18 17:26
Start Date: 23/Apr/18 17:26
Worklog Time Spent: 10m 
  Work Description: rangadi commented on issue #5111: [BEAM-4038] Support 
Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#issuecomment-383655889
 
 
   @gkumar7, did you push all your changes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 94166)
Time Spent: 8h 50m  (was: 8h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=93220=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93220
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 14:07
Start Date: 20/Apr/18 14:07
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r183058074
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,36 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+// ConsumerRecord is used to simply create a list of headers
+ConsumerRecord consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+
+if (!ConsumerSpEL.hasHeaders) {
+  return null;
+} else if (!records.iterator().hasNext()) {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93220)
Time Spent: 8h 40m  (was: 8.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=93219=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93219
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 14:07
Start Date: 20/Apr/18 14:07
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r183061685
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,36 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+// ConsumerRecord is used to simply create a list of headers
+ConsumerRecord consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+
+if (!ConsumerSpEL.hasHeaders) {
+  return null;
+} else if (!records.iterator().hasNext()) {
+  return consumerRecord.headers();
+}
+
+records.forEach(kv -> consumerRecord.headers().add(kv.getKey(), 
kv.getValue()));
+return consumerRecord.headers();
+  }
+
+  private Iterable> toIterable(KafkaRecord record) {
+if (!ConsumerSpEL.hasHeaders || record.getHeaders() == null){
 
 Review comment:
   ```record.getHeaders()``` could have been null when```toHeaders()``` 
returned null even if headers were supported (in the previous change I updated 
this to return an empty iterable of headers).
   
   Agreed that it is not needed anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93219)
Time Spent: 8.5h  (was: 8h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=93077=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93077
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 05:19
Start Date: 20/Apr/18 05:19
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182947417
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,36 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+// ConsumerRecord is used to simply create a list of headers
+ConsumerRecord consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+
+if (!ConsumerSpEL.hasHeaders) {
+  return null;
+} else if (!records.iterator().hasNext()) {
+  return consumerRecord.headers();
+}
+
+records.forEach(kv -> consumerRecord.headers().add(kv.getKey(), 
kv.getValue()));
+return consumerRecord.headers();
+  }
+
+  private Iterable> toIterable(KafkaRecord record) {
+if (!ConsumerSpEL.hasHeaders || record.getHeaders() == null){
 
 Review comment:
   record.getHeaders() is never expected to return null (or did you mean to 
check `record.headers == null`?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93077)
Time Spent: 8h 20m  (was: 8h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=93078=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93078
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 05:19
Start Date: 20/Apr/18 05:19
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182947337
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,36 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+// ConsumerRecord is used to simply create a list of headers
+ConsumerRecord consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+
+if (!ConsumerSpEL.hasHeaders) {
+  return null;
+} else if (!records.iterator().hasNext()) {
 
 Review comment:
   You can remove else part.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93078)
Time Spent: 8h 20m  (was: 8h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=93075=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93075
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 04:55
Start Date: 20/Apr/18 04:55
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182945790
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -139,4 +156,11 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
   return offsetAndTimestamp.offset();
 }
   }
+
+  public Headers getHeaders(ConsumerRecord rawRecord) {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93075)
Time Spent: 8h 10m  (was: 8h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=93064=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93064
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 04:14
Start Date: 20/Apr/18 04:14
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182942212
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -139,4 +156,11 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
   return offsetAndTimestamp.offset();
 }
   }
+
+  public Headers getHeaders(ConsumerRecord rawRecord) {
 
 Review comment:
   Yes, that's that what I was thinking. 'hasHeaders' already used there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93064)
Time Spent: 8h  (was: 7h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92969=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92969
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 00:34
Start Date: 20/Apr/18 00:34
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182916997
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,34 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+if (!records.iterator().hasNext()){
+  return null;
+}
+
+// ConsumerRecord is used to simply create a list of headers
+ConsumerRecord consumerRecord = new ConsumerRecord<>(
 
 Review comment:
   ```Headers headers = ... ```
   raises a ```ClassNotFoundException``` when serializing 
```KafkaRecordCoder```.
   
   Updated to use empty strings instead of "key", "value"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92969)
Time Spent: 7h 40m  (was: 7.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92971=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92971
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 00:34
Start Date: 20/Apr/18 00:34
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182917521
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,34 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+if (!records.iterator().hasNext()){
+  return null;
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92971)
Time Spent: 7h 50m  (was: 7h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92970=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92970
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 20/Apr/18 00:34
Start Date: 20/Apr/18 00:34
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182920256
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -139,4 +156,11 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
   return offsetAndTimestamp.offset();
 }
   }
+
+  public Headers getHeaders(ConsumerRecord rawRecord) {
 
 Review comment:
   This is used by ```KafkaUnboundedReader```, would you like to move this 
logic to that class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92970)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92881=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92881
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 22:21
Start Date: 19/Apr/18 22:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182899943
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,34 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+if (!records.iterator().hasNext()){
+  return null;
+}
+
+// ConsumerRecord is used to simply create a list of headers
+ConsumerRecord consumerRecord = new ConsumerRecord<>(
 
 Review comment:
   Change this to 
   ```
   Headers headers = ... 
   records.forEach(... headers.add())l
   ```
   Also use empty strings rather than "key", "value" etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92881)
Time Spent: 7h 10m  (was: 7h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92882
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 22:21
Start Date: 19/Apr/18 22:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182901167
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,34 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+(Headers) toHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Object toHeaders(Iterable> records) {
+if (!records.iterator().hasNext()){
+  return null;
 
 Review comment:
   When headers are supported, this should be empty iterable of headers (to 
match ConsumerRecord and to avoid returning nulls to user).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92882)
Time Spent: 7h 20m  (was: 7h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92883=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92883
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 22:21
Start Date: 19/Apr/18 22:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182899063
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -139,4 +156,11 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
   return offsetAndTimestamp.offset();
 }
   }
+
+  public Headers getHeaders(ConsumerRecord rawRecord) {
 
 Review comment:
   Since hasHeaders is checked directly, this method can be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92883)
Time Spent: 7.5h  (was: 7h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92375=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92375
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182627994
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Alright, this works. Made a few more changes to ensure 
```ClassNotFoundException``` does not occur during serialization of 
```KafkaRecordCoder```.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92375)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92373=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92373
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182627458
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +77,32 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Headers getHeaders(Iterable> records) {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92373)
Time Spent: 6h 40m  (was: 6.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92376=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92376
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182628417
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -76,6 +80,10 @@ public long getOffset() {
 return offset;
   }
 
+  public Headers getHeaders() {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92376)
Time Spent: 7h  (was: 6h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92374=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92374
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182629453
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 ##
 @@ -212,6 +212,7 @@ public boolean advance() {
 rawRecord.offset(),
 consumerSpEL.getRecordTimestamp(rawRecord),
 consumerSpEL.getRecordTimestampType(rawRecord),
+consumerSpEL.getHeaders(rawRecord),
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92374)
Time Spent: 6h 50m  (was: 6h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92343=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92343
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:55
Start Date: 19/Apr/18 01:55
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182616263
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   hmm.. may be that is one of the reasons  `ConsumerRecord.headers()` returns 
mutable headers. Could we just use that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92343)
Time Spent: 6.5h  (was: 6h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92326=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92326
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:15
Start Date: 19/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182611605
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Thank you for sharing this. Yes, this works as expected, but the issue comes 
when we initialize Headers in the if branch like so (similar to my example):
   
   ```java
   if (isPresent && kafkaHeaders == null) {
   Headers headers = new RecordHeaders();
   headers.add("headerKey", "headerVal".getBytes());
   return new ConsumerRecord<>("test", 0, 0L, "key", "value").headers();
   }
   ```
   Even with is.present set to False, this throws a 
```ClassNotFoundException``` exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92326)
Time Spent: 6h 20m  (was: 6h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92322
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:12
Start Date: 19/Apr/18 01:12
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182611605
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Thank you for sharing this. Yes, this works as expected, but the issue comes 
when we initialize Headers in the if branch like so (similar to my example):
   
   ```java
   if (isPresent && kafkaHeaders == null) {
   Headers headers = new RecordHeaders();
   headers.add("headerKey", "headerVal".getBytes());
   return new ConsumerRecord<>("test", 0, 0L, "key", "value").headers();
   }
   ```
   Even with is.present set to False, this throws a runtime exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92322)
Time Spent: 6h 10m  (was: 6h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92038
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 18/Apr/18 03:25
Start Date: 18/Apr/18 03:25
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182298038
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +77,32 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Headers getHeaders(Iterable> records) {
 
 Review comment:
   minor: call this `toHeaders()`? same for `getIterable()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92038)
Time Spent: 6h  (was: 5h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92037
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 18/Apr/18 03:25
Start Date: 18/Apr/18 03:25
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182297723
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -76,6 +80,10 @@ public long getOffset() {
 return offset;
   }
 
+  public Headers getHeaders() {
 
 Review comment:
   I think this should throw an exception if 'hasHeaders' is false. We don't 
need the user to worry about null values. `ConsumerRecord.headers` is not 
expected to be null either. We can do this once we know the current solution 
works as expected with older kafka-clients versions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92037)
Time Spent: 5h 50m  (was: 5h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92039=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92039
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 18/Apr/18 03:25
Start Date: 18/Apr/18 03:25
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182299393
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Thanks for the update. I missed later part of your comment above.
   
   > and I package this class and use it as a dependency in another application 
which simply instantiates this class. Even though the downstream application 
did not call getHeaders, it will still throw a runtime exception.
   
   Is that true? Can you try it with a small simple java class? I tested with a 
small test app and it worked ok . 
   Run it with different versions of kafka, and with '-Dis.present=True|False':
   
   
   package org.example;
   
   import org.apache.kafka.clients.consumer.ConsumerRecord;
   import org.apache.kafka.common.header.Headers;
   
   public class TestRuntimeLoading {
   
 public static class A {
   
   Headers kafkaHeaders = null;
   
   Headers getHeaders(boolean isPresent) {
 if (isPresent && kafkaHeaders == null) {
   return new ConsumerRecord<>("test", 0, 0L, "key", "value").headers();
 } else {
   throw new RuntimeException("no headers");
 }
   }
   
 }
   
 public static void main(String[] args) {
   
   boolean isPresent = Boolean.valueOf(System.getProperty("is.present", 
"True"));
   Object headers = new A().getHeaders(isPresent);
   System.out.println("Headers : " + headers.toString());
 }
   }
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92039)
Time Spent: 6h  (was: 5h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92004
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182273236
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -94,7 +96,7 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
  -150_000L,
  -120_000L,
  -140_000L,
-  100_000L,  // <<< timestamp is in future
+ 100_000L, // <<< timestamp is in future
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92004)
Time Spent: 5h 20m  (was: 5h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92007=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92007
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182272907
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,24 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92007)
Time Spent: 5h 40m  (was: 5.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91999
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182272666
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -139,4 +158,14 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
   return offsetAndTimestamp.offset();
 }
   }
+
+  public Headers getHeaders(ConsumerRecord rawRecord) {
+Headers recordHeaders = new RecordHeaders();
 
 Review comment:
   Updated, instead of throwing RuntimeException, for now simply returning null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91999)
Time Spent: 4h 50m  (was: 4h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92006
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182273145
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -46,13 +47,14 @@
   .map(ts -> {
 Instant result = policy.getTimestampForRecord(
   null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts,
-  KafkaTimestampType.CREATE_TIME, "key", 
"value"));
+  KafkaTimestampType.CREATE_TIME,
+  new RecordHeaders(),
 
 Review comment:
   Kept headers null for this test, added tests to KafkaRecordCoderTest for 
null and non-null cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92006)
Time Spent: 5.5h  (was: 5h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92005=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92005
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182273223
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -79,7 +81,7 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
 -150_000L,
 -120_000L,
 -140_000L,
--100_000L,  // <<< Max timestamp
+-100_000L, // <<< Max timestamp
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92005)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92003=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92003
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182273301
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Sure will do. See updated comment above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92003)
Time Spent: 5h 10m  (was: 5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92001=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92001
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182272599
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -131,6 +149,7 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
 .offsetsForTimes(ImmutableMap.of(topicPartition, time.getMillis()))
 .values());
 
+
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92001)
Time Spent: 5h 10m  (was: 5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92002
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182273112
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,24 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Headers getHeaders(Iterable> records) {
+Headers headers = new RecordHeaders();
 
 Review comment:
   Instantiating via ```ConsumerRecord``` is a bit hacky. It should be fine if 
we use ```RecordHeaders``` directly. If it was not meant to be used, it should 
be encapsulated more thoroughly (package-private).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92002)
Time Spent: 5h 10m  (was: 5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92000
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182272769
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -90,7 +98,7 @@ public KafkaTimestampType getTimestampType() {
 
   @Override
   public int hashCode() {
-return Arrays.deepHashCode(new Object[]{topic, partition, offset, 
timestamp, kv});
+return Arrays.deepHashCode(new Object[] {topic, partition, offset, 
timestamp, headers, kv});
 
 Review comment:
   Verified, null works as expected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92000)
Time Spent: 5h  (was: 4h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91998=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91998
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:43
Start Date: 17/Apr/18 23:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182272786
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -55,6 +64,7 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.encode(value.getOffset(), outStream);
 longCoder.encode(value.getTimestamp(), outStream);
 intCoder.encode(value.getTimestampType().ordinal(), outStream);
+headerCoder.encode(getIterable(value.getHeaders()), outStream);
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91998)
Time Spent: 4h 40m  (was: 4.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91994=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91994
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:24
Start Date: 17/Apr/18 23:24
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182243015
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Could you explain how this would work? Marking this as ```@Nullable``` would 
not help circumvent the issue that ```Headers``` interface would still need to 
be on the classpath. To verify, I made the requested changes and I receive the 
following runtime error:
   
   ```
   Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/header/Headers
at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.getOutputCoder(KafkaUnboundedSource.java:142)
   ```
   
   When using kafka-clients 0.10.1.0. The ```Headers``` interface is only 
available post 0.11.0.0.
   
   Did a few more tests, here is a simple example:
   
   Suppose I have a class with a single *unused* method:
   ```
   public void getHeaders() {
   if (!hasHeaders()){ // reflection logic using ConsumerRecord
 return;
   }
   Headers headers = new RecordHeaders();
   headers.add(new RecordHeader("hi", "hi".getBytes()));
}
   ```
   and I package this class and use it as a dependency in another application 
which simply instantiates this class. Even though the downstream application 
did not call ```getHeaders```, it will still throw a runtime exception.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91994)
Time Spent: 4h 20m  (was: 4h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91995=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91995
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 23:24
Start Date: 17/Apr/18 23:24
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182270754
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   can you push your changes? `KafkaRecordCoder` constructor does not access 
Headers at all. It is accessed only in encode and decode (where we should 
handle null values for headers).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91995)
Time Spent: 4.5h  (was: 4h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91943
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 21:43
Start Date: 17/Apr/18 21:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182243015
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Could you explain how this would work? Marking this as ```@Nullable``` would 
not help circumvent the issue that ```Headers``` interface would still need to 
be on the classpath. To verify, I made the requested changes and I receive the 
following runtime error:
   
   ```
   Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/header/Headers
at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.getOutputCoder(KafkaUnboundedSource.java:142)
   ```
   
   When using kafka-clients 0.10.1.0. The ```Headers``` interface is only 
available post 0.11.0.0.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91943)
Time Spent: 4h 10m  (was: 4h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91938=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91938
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 21:42
Start Date: 17/Apr/18 21:42
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182243015
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Could you explain how this would work? Marking this as ```@Nullable``` would 
not help circumvent the issue that ```Headers``` class would still need to be 
on the classpath. To verify, I made the requested changes and I receive the 
following runtime error:
   
   ```
   Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/header/Headers
at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.getOutputCoder(KafkaUnboundedSource.java:142)
   ```
   
   When using kafka-clients 0.10.1.0.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91938)
Time Spent: 4h  (was: 3h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91839=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91839
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182157813
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -76,6 +80,10 @@ public long getOffset() {
 return offset;
   }
 
+  public Headers getHeaders() {
 
 Review comment:
   need to call getHeaders from `ConumserSpEL` or move it here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91839)
Time Spent: 3h  (was: 2h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91838=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91838
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182156839
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -139,4 +158,14 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
   return offsetAndTimestamp.offset();
 }
   }
+
+  public Headers getHeaders(ConsumerRecord rawRecord) {
+Headers recordHeaders = new RecordHeaders();
 
 Review comment:
   Need to avoid accessing anything related to `Headers` when 'hasHeaders' is 
false so that it does not cause runtime exception.
   I would implement this method something like (could move the method to 
`KafkaRecord` as well) : 
   ```
 if (hasHeaders) {
return rawRecord.headers; // No need to covert to RecordHeader (note 
that RecordHeader is an internal class).
 } else {
   throw new RuntimeException("The version kafka-clients does not support 
record headers, please use version 0.11.0.0 or newer".).
 }
   ```
  
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91838)
Time Spent: 2h 50m  (was: 2h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91842
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182158860
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,24 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 
 Review comment:
   Same here, set headers to null in case of empty iterable (again in order to 
handle old versions properly).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91842)
Time Spent: 3h 20m  (was: 3h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91846=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91846
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182158583
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -55,6 +64,7 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.encode(value.getOffset(), outStream);
 longCoder.encode(value.getTimestamp(), outStream);
 intCoder.encode(value.getTimestampType().ordinal(), outStream);
+headerCoder.encode(getIterable(value.getHeaders()), outStream);
 
 Review comment:
   Need to handle nulls. Might want to serialize empty iterable in case of null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91846)
Time Spent: 3h 50m  (was: 3h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91840=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91840
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182159443
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +76,24 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Headers getHeaders(Iterable> records) {
+Headers headers = new RecordHeaders();
 
 Review comment:
   Not sure if we should instantiate `RecordHeaders` directly, it is marked 
internal. Another option is to instantiate it through a `ConsumerRecord`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91840)
Time Spent: 3h 10m  (was: 3h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91843=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91843
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182157490
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Mark this `@Nullable` (It needs to be null to support old versions).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91843)
Time Spent: 3h 20m  (was: 3h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91837=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91837
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182159915
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 ##
 @@ -212,6 +212,7 @@ public boolean advance() {
 rawRecord.offset(),
 consumerSpEL.getRecordTimestamp(rawRecord),
 consumerSpEL.getRecordTimestampType(rawRecord),
+consumerSpEL.getHeaders(rawRecord),
 
 Review comment:
   Here, we want to set headers to null if `hasHeaders` is false.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91837)
Time Spent: 2h 40m  (was: 2.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91835=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91835
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182154117
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 ##
 @@ -131,6 +149,7 @@ public long offsetForTime(Consumer consumer, 
TopicPartition topicPartition
 .offsetsForTimes(ImmutableMap.of(topicPartition, time.getMillis()))
 .values());
 
+
 
 Review comment:
   Remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91835)
Time Spent: 2h 20m  (was: 2h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91841=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91841
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182160738
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -46,13 +47,14 @@
   .map(ts -> {
 Instant result = policy.getTimestampForRecord(
   null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts,
-  KafkaTimestampType.CREATE_TIME, "key", 
"value"));
+  KafkaTimestampType.CREATE_TIME,
+  new RecordHeaders(),
 
 Review comment:
   Please include a test with null headers. Also add couple of simple tests to 
`KafkaRecordCoderTest` to ensure both null and non-null headers are handled 
correctly in the coder.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91841)
Time Spent: 3h 10m  (was: 3h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91844=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91844
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182157943
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -90,7 +98,7 @@ public KafkaTimestampType getTimestampType() {
 
   @Override
   public int hashCode() {
-return Arrays.deepHashCode(new Object[]{topic, partition, offset, 
timestamp, kv});
+return Arrays.deepHashCode(new Object[] {topic, partition, offset, 
timestamp, headers, kv});
 
 Review comment:
   Please check javadoc for `deepHashCode`() to make sure null is ok (mostly 
yes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91844)
Time Spent: 3.5h  (was: 3h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91845=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91845
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182160126
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -94,7 +96,7 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
  -150_000L,
  -120_000L,
  -140_000L,
-  100_000L,  // <<< timestamp is in future
+ 100_000L, // <<< timestamp is in future
 
 Review comment:
   Remove?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91845)
Time Spent: 3h 40m  (was: 3.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91836
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182160163
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -79,7 +81,7 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
 -150_000L,
 -120_000L,
 -140_000L,
--100_000L,  // <<< Max timestamp
+-100_000L, // <<< Max timestamp
 
 Review comment:
   Remove?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91836)
Time Spent: 2.5h  (was: 2h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91577=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91577
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 02:42
Start Date: 17/Apr/18 02:42
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181938653
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   I see. That is correct, compilation error is fine. All the kafka-client 
versions before 0.10.1 are already deprecated [1]. If a test wants to test 
kafka headers, it probably makes sense to depend on a kafka-client version that 
has headers. 
   
   
[1]https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L624


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91577)
Time Spent: 2h 10m  (was: 2h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91576=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91576
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 02:30
Start Date: 17/Apr/18 02:30
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181937184
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   I have removed the additional classes. Yes, I understand that master won't 
compile, but I am referring to compiling my test program. What I mean to say is 
if the test program (which depends on beam-kafka) calls 
KafkaRecord.getHeaders(), this raises a compile error. Is this what we want? 
   
   It seems a bit counter-intuitive to expose a method such as getHeaders() and 
not be able to compile the *test* program when using it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91576)
Time Spent: 2h  (was: 1h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91575=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91575
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 02:28
Start Date: 17/Apr/18 02:28
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181937184
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   Yes, I understand. I have removed the additional classes. What I mean to say 
is if the test program (which depends on beam-kafka) calls 
KafkaRecord.getHeaders(), this raises a compile error. Is this what we want? 
   
   It seems a bit counter-intuitive to expose a method such as getHeaders() and 
not be able to compile the program when using it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91575)
Time Spent: 1h 50m  (was: 1h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91566=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91566
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 01:42
Start Date: 17/Apr/18 01:42
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181931824
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   As noted in previous two comments, we don't need to support compiling with 
0.9. Even master would not not compile with 0.9. We can require 0.11.x 
(whichever version first included headers) for compilation and to run tests.
   If you get the code working with the default version of kafka-clients Beam 
(I think 1.0.x), that is good enough. During the review we can check if it 
would work with older version of kafka-clients at runtime.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91566)
Time Spent: 1h 40m  (was: 1.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91564=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91564
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 17/Apr/18 01:27
Start Date: 17/Apr/18 01:27
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181930163
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   Hmm, when adding ```Headers``` to ```KafkaRecord```, I was not able to 
instantiate ```KafkaRecord``` in my test program. Compilation failed with:
   
   ```
   Error:(8, 5) java: cannot access org.apache.kafka.common.header.Headers
 class file for org.apache.kafka.common.header.Headers not found
   ```
   Tried with both JDK 1.7 and 1.8. This is using kafka-clients 0.9.0.0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91564)
Time Spent: 1.5h  (was: 1h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91533=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91533
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 16/Apr/18 22:56
Start Date: 16/Apr/18 22:56
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181909253
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   We can require recent kafka version to compile and run tests. But at runtime 
we want KafkaIO to work with older versions of kafka-clients at runtime. That's 
why we have the runtime reflection checks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91533)
Time Spent: 1h 20m  (was: 1h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91142=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91142
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 16/Apr/18 01:50
Start Date: 16/Apr/18 01:50
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181608020
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   We want to avoid runtime errors, not compile time. As long as user with 
older version of Kafka at runtime does not use getHeaders() interface, it will 
be fine. I tried it in a test program and it worked ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91142)
Time Spent: 1h 10m  (was: 1h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91046=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91046
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 14/Apr/18 01:44
Start Date: 14/Apr/18 01:44
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181537606
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -75,28 +82,22 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
 
 // (1) Test simple case : watermark == max_timesatmp - max_delay
 
-List input = ImmutableList.of(-200_000L,
--150_000L,
--120_000L,
--140_000L,
--100_000L,  // <<< Max timestamp
--110_000L);
+List input =
+ImmutableList.of(
+-200_000L, -150_000L, -120_000L, -140_000L, -100_000L, // <<< Max 
timestamp
+-110_000L);
 assertThat(getTimestampsForRecords(policy, now, input), is(input));
 
 // Watermark should be max_timestamp - maxDelay
-assertThat(policy.getWatermark(ctx), is(now
 
 Review comment:
   Sure thing, must be some ide setting I missed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91046)
Time Spent: 1h  (was: 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91045=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91045
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 14/Apr/18 01:43
Start Date: 14/Apr/18 01:43
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181537565
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   > Add 'Headers' field to KafkaRecord
   
   Is 'Headers' referring to ```org.apache.kafka.common.header.Headers```? If 
so, how would this approach work for Kafka 0.9.x (note that this interface is 
only available in Kafka 0.11.x) ? Wouldn't this lead to a compile-time error?
   
   The main reason we use our own versions of ```Header```, etc is to avoid 
such compile-time errors. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91045)
Time Spent: 50m  (was: 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91019=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91019
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 13/Apr/18 23:28
Start Date: 13/Apr/18 23:28
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181529876
 
 

 ##
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 ##
 @@ -75,28 +82,22 @@ public void testCustomTimestampPolicyWithLimitedDelay() {
 
 // (1) Test simple case : watermark == max_timesatmp - max_delay
 
-List input = ImmutableList.of(-200_000L,
--150_000L,
--120_000L,
--140_000L,
--100_000L,  // <<< Max timestamp
--110_000L);
+List input =
+ImmutableList.of(
+-200_000L, -150_000L, -120_000L, -140_000L, -100_000L, // <<< Max 
timestamp
+-110_000L);
 assertThat(getTimestampsForRecords(policy, now, input), is(input));
 
 // Watermark should be max_timestamp - maxDelay
-assertThat(policy.getWatermark(ctx), is(now
 
 Review comment:
   Please remove all of these spurious changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91019)
Time Spent: 40m  (was: 0.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91018=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91018
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 13/Apr/18 23:28
Start Date: 13/Apr/18 23:28
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r181529808
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaHeader.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link org.apache.kafka.common.header.Header}. 
Included here in order
+ * to support older Kafka versions (0.9.x).
+ */
+public interface KafkaHeader {
 
 Review comment:
   I think strictly we don't need to our own versions `Header`, `Headers` etc. 
How about this: 
- Add 'Headers' field to KafkaRecord. We ensure the field is accessed only 
if the `consumerSpEL.hasHeaders` is true.
- add `Headers getHeaders()` method to KafkaRecord, which throws an 
exception if `hasHeaders` is false.
   
   I think we could have done something like this with `TimestampType` too. 
WDYT.
   It is a bit hacky, but I guess it is better than replicating more of 
ConsumerRecord functionality. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91018)
Time Spent: 40m  (was: 0.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=90955=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90955
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 13/Apr/18 20:08
Start Date: 13/Apr/18 20:08
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on issue #5111: BEAM-4038: Support 
Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#issuecomment-381247716
 
 
   @rangadi, let me know what you think. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90955)
Time Spent: 20m  (was: 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=90275=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90275
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 12/Apr/18 03:29
Start Date: 12/Apr/18 03:29
Worklog Time Spent: 10m 
  Work Description: gkumar7 opened a new pull request #5111: BEAM-4038: 
Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111
 
 
   Adds read support for Kafka headers. These changes have been tested with 
prior versions of Kafka 0.9.x as well as the latest version (1.0.x). 
   
   Added KafkaHeader, KafkaHeaders, KafkaRecordHeader, KafkaRecordHeaders for 
backwards compatibility.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90275)
Time Spent: 10m
Remaining Estimate: 0h

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)