yanghua commented on a change in pull request #6615: [FLINK-8354]
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034555
##
File path:
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
##
@@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
+ /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly
written to and read from Kafka.
+*/
+ @Test(timeout = 6)
+ public void testHeaders() throws Exception {
+ final String topic = "headers-topic";
+ final long testSequenceLength = 127L;
+ createTestTopic(topic, 3, 1);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream testSequence = env.addSource(new
SourceFunction() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext ctx) throws
Exception {
+ long i = 0;
+ while (running) {
+ ctx.collectWithTimestamp(i, i * 2);
+ if (i++ == testSequenceLength) {
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ FlinkKafkaProducer011 producer = new
FlinkKafkaProducer011<>(topic,
+ new TestHeadersKeyedSerializationSchema(topic),
standardProps, Optional.empty());
+ testSequence.addSink(producer).setParallelism(3);
+ env.execute("Produce some data");
+
+ // Now let's consume data and check that headers deserialized
correctly
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ FlinkKafkaConsumer011 kafkaSource = new
FlinkKafkaConsumer011<>(topic, new
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+ env.addSource(kafkaSource).addSink(new
TestHeadersElementValid());
+ env.execute("Consume again");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+* Element consisting of key, value and headers represented as list of
tuples: key, list of Bytes.
+*/
+ public static class TestHeadersElement extends Tuple3>>> {
+
+ }
+
+ /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+ private static Iterable> headersFor(Long
element) {
+ final long x = element;
+ return Arrays.asList(
+ new AbstractMap.SimpleImmutableEntry<>("low", new
byte[]{
+ (byte) ((x >>> 8) & 0xFF),
+ (byte) ((x) & 0xFF)
+ }),
+ new AbstractMap.SimpleImmutableEntry<>("low", new
byte[]{
+ (byte) ((x >>> 24) & 0xFF),
+ (byte) ((x >>> 16) & 0xFF)
+ }),
+ new AbstractMap.SimpleImmutableEntry<>("high", new
byte[]{
+ (byte) ((x >>> 40) & 0xFF),
+ (byte) ((x >>> 32) & 0xFF)
+ }),
+ new AbstractMap.SimpleImmutableEntry<>("high", new
byte[]{
+ (byte) ((x >>> 56) & 0xFF),
+ (byte) ((x >>> 48) & 0xFF)
+ })
+ );
+ }
+
+ /**
+* Convert headers into list of tuples representation. List of tuples
is more convenient to use in
+* assert expressions, because they have equals
+* @param headers - headers
+* @return list of tuples(string, list of Bytes)
+*/
+ private static List>>
headersAsList(Iterable> headers) {
+