[GitHub] yanghua commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034601
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ##
 @@ -206,6 +208,9 @@ protected KafkaConsumerCallBridge createCallBridge() {
return new KafkaConsumerCallBridge();
}
 
+   protected Iterable> 
headersOf(ConsumerRecord record) {
+   return Collections.emptyList();
+   }
 
 Review comment:
   insert a new line after this looks better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034632
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -54,4 +55,20 @@
 * @return True, if the element signals end of stream, false otherwise.
 */
boolean isEndOfStream(T nextElement);
+
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
 
 Review comment:
   missed topic parameter description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034555
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is more convenient to use in
+* assert expressions, because they have equals
+* @param headers - headers
+* @return list of tuples(string, list of Bytes)
+*/
+   private static List>> 
headersAsList(Iterable> headers) {
+