[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3314


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104311996
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -381,6 +381,10 @@ else if (partitionsRemoved) {

partitionsIterator.remove();
continue 
partitionsLoop;
}
+
+   if (value == null) {
+   continue;
+   }
--- End diff --

Would it make sense to do the `null` checking inside `emitRecord(...)`?
Otherwise, we wouldn't be updating the state for skipped records, and 
therefore not accounting it as "already processed".

I don't think it really matters, since we aren't outputting anything 
anyway, but I see at least one minor advantage that might deserve changing it: 
If we fail during a series of continuous skipped records, we won't be wasting 
any overhead re-processing them on restore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104312079
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -419,6 +424,164 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testSkipCorruptedMessage() throws Exception {
+
+   // - some test data -
+
+   final String topic = "test-topic";
+   final int partition = 3;
+   final byte[] payload = new byte[] {1, 2, 3, 4};
+
+   final List> records = 
Arrays.asList(
+   new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 17, payload, 
"end".getBytes()));
+
+   final Map>> 
data = new HashMap<>();
+   data.put(new TopicPartition(topic, partition), records);
+
+   final ConsumerRecords consumerRecords = new 
ConsumerRecords<>(data);
+
+   // - the test consumer -
+
+   final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer() {
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) {
+   return consumerRecords;
+   }
+   });
+
+   
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+   // - build a fetcher -
+
+   ArrayList results = new ArrayList<>();
+   SourceContext sourceContext = new 
CollectingSourceContext<>(results, results);
+   Map partitionsWithInitialOffsets =
+   Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   KeyedDeserializationSchema schema = new 
KeyedDeserializationSchema() {
+
+   @Override
+   public String deserialize(byte[] messageKey, byte[] 
message,
+ 
String topic, int partition, long offset) throws IOException {
+   return offset == 15 ? null : new 
String(message);
+   }
+
+   @Override
+   public boolean isEndOfStream(String nextElement) {
+   return "end".equals(nextElement);
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   };
+
+   final Kafka09Fetcher fetcher = new Kafka09Fetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic watermark extractor */
+   null, /* punctuated watermark extractor */
+   new TestProcessingTimeService(),
+   10, /* watermark interval */
+   this.getClass().getClassLoader(),
+   true, /* checkpointing */
+   "task_name",
+   new UnregisteredMetricsGroup(),
+   schema,
+   new Properties(),
+   0L,
+   false);
+
+
+   // - run the fetcher -
+
+   fetcher.runFetchLoop();
+   assertEquals(1, results.size());
+   }
+
+   @Test
+   public void testNullAsEOF() throws Exception {
--- End diff --

I'm not sure if this test is necessary. It's essentially just testing that 
`isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the 
condition is `element == null` seems irrelevant to what's been tested.

We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-04 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104227652
  
--- Diff: docs/dev/connectors/kafka.md ---
@@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the 
binary data in Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+There are two possible design choice when the `DeserializationSchema` 
encounters a corrupted message. It can
--- End diff --

choices


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-28 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103405663
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichKeyedDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
+ * {@see KeyedSerializationSchema}
+ *
+ * @param  The type created by the keyed deserialization schema.
+ */
+public interface RichKeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set)
+* @param message The message, as a byte array. (null if the message 
was empty or deleted)
+* @param partition The partition the message has originated from
+* @param offset the offset of the message in the original source (for 
example the Kafka offset)
+* @param collector the user-provided collector that deserializes the 
bytes into zero or more
+*  records.
+*
+* @return The deserialized message as an object.
--- End diff --

The method doesn't return anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103399810
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

I would put it perhaps in `org.apache.flink.streaming.kafka.serialization` 
under `flink-connector-kafka-base`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-27 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103339489
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

Can you please suggest where it should be put?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102902685
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 ---
@@ -119,7 +119,7 @@ public Void answer(InvocationOnMock invocation) {
 @SuppressWarnings("unchecked")
 SourceContext sourceContext = mock(SourceContext.class);
 List topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
-KeyedDeserializationSchema schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+RichKeyedDeserializationSchemaWrapper schema = new 
RichKeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
--- End diff --

This file will have conflict with the current `master`, because I recently 
pushed a hotfix to `master` to fix the indentation of this file (previously, 
it's incorrectly using spaces to indent instead of tabs). Sorry about this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900238
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchemaWrapper.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+
+public class RichKeyedDeserializationSchemaWrapper implements 
RichKeyedDeserializationSchema {
--- End diff --

Can you also include Javadocs for this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102898307
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -422,6 +429,99 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testRichDeserializationSchema() throws Exception {
+   final String topic = "test-topic";
+   final int partition = 3;
+   final byte[] payload = new byte[] {1, 2, 3, 4};
+   final byte[] endPayload = 
"end".getBytes(StandardCharsets.UTF_8);
+
+   final List> records = 
Arrays.asList(
+   new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 17, payload, 
endPayload));
+
+   final Map>> 
data = new HashMap<>();
+   data.put(new TopicPartition(topic, partition), records);
+
+   final ConsumerRecords consumerRecords = new 
ConsumerRecords<>(data);
+
+   // - the test consumer -
+
+   final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer() {
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) {
+   return consumerRecords;
+   }
+   });
+
+   
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+   // - build a fetcher -
+
+   ArrayList results = new ArrayList<>();
+   SourceContext sourceContext = new 
CollectingSourceContext<>(results, results);
+   List topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+   RichKeyedDeserializationSchema schema = new 
RichKeyedDeserializationSchema() {
+   @Override
+   public void deserialize(
+   byte[] messageKey, byte[] message, String 
topic, int partition,
+   long offset, Collector collector) 
throws IOException {
+   if (offset != 16) {
+   collector.collect(new String(message));
+   }
+   }
+
+   @Override
+   public boolean isEndOfStream(String nextElement) {
+   return nextElement.equals("end");
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   };
+
+   final Kafka09Fetcher fetcher = new Kafka09Fetcher<>(
+   sourceContext,
+   topics,
+   null, /* no restored state */
+   null, /* periodic watermark extractor */
+   null, /* punctuated watermark extractor */
+   new TestProcessingTimeService(),
+   10, /* watermark interval */
+   this.getClass().getClassLoader(),
+   false, /* checkpointing */
+   "task_name",
+   new UnregisteredMetricsGroup(),
+   schema,
+   new Properties(),
+   0L,
+   StartupMode.GROUP_OFFSETS,
+   false);
+
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
+   final Thread fetcherRunner = new Thread("fetcher runner") {
--- End diff --

We have a nice utility `CheckedThread` that serves for the tested purpose 
here (catching errors and storing its reference).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102898788
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
--- End diff --

The name of the class is `RichKeyedDeserializationSchema `, but in the 
Javadocs it mentions `RichDeserializationSchema `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900820
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List topics, 
DeserializationSchema deseri
 * @param props
 *   The properties that are used to configure both the fetcher 
and the offset handler.
 */
-   public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema deserializer, Properties props) {
+   public FlinkKafkaConsumer08(List topics, 
RichKeyedDeserializationSchema deserializer, Properties props) {
super(topics, deserializer);
--- End diff --

So, instead of changing the constructor, we should still do
`super(topics, new RickKeyedDeserializationSchemaWrapper(deserializer))`
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102901123
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 ---
@@ -171,8 +171,9 @@ private static String getResourceFilename(String 
filename) {
private final List partitions;
 
@SuppressWarnings("unchecked")
-   DummyFlinkKafkaConsumer(List partitions) {
-   super(Arrays.asList("dummy-topic"), 
(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
+   DummyFlinkKafkaConsumer(
+   List partitions) {
--- End diff --

If its just one parameter, I don't think we need a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900986
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
final ConsumerRecords records = 
handover.pollNext();
 
// get the records for each topic partition
-   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   for (final 
KafkaTopicPartitionState partition : subscribedPartitions()) {
 
List> 
partitionRecords =

records.records(partition.getKafkaPartitionHandle());
 
-   for (ConsumerRecord 
record : partitionRecords) {
-   final T value = 
deserializer.deserialize(
-   record.key(), 
record.value(),
-   record.topic(), 
record.partition(), record.offset());
-
-   if 
(deserializer.isEndOfStream(value)) {
-   // end of stream 
signaled
-   running = false;
-   break;
-   }
-
-   // emit the actual record. this 
also updates offset state atomically
-   // and deals with timestamps 
and watermark generation
-   emitRecord(value, partition, 
record.offset(), record);
+   for (final ConsumerRecord record : partitionRecords) {
+   final Collector collector = 
new Collector() {
+   @Override
+   public void collect(T 
value) {
+   if 
(deserializer.isEndOfStream(value)) {
+   // end 
of stream signaled
+   running 
= false;
+   } else {
+   // emit 
the actual record. this also updates offset state atomically
+   // and 
deals with timestamps and watermark generation
+   try {
+   
emitRecord(value, partition, record.offset(), record);
+   } catch 
(Exception e) {
+   
throw new RuntimeException(e);
+   }
+   }
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   deserializer.deserialize(
+   record.key(), 
record.value(),
+   record.topic(), 
record.partition(), record.offset(), collector);
--- End diff --

The formatting for the list of arguments here could be nicer. Perhaps one 
argument per line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102898901
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
+ * {@see KeyedSerializationSchema}
--- End diff --

I'm not sure why we need to link to `KeyedSerializationSchema` in the 
Javadocs for the new serialization schema.
From what I know, we're going to completely replace it, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102901299
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1236,10 +1237,11 @@ public Tuple2WithTopicSchema(ExecutionConfig ec) {
}
 
@Override
-   public Tuple3 deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
+   public void deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset,
+   
Collector> collector) throws IOException {
--- End diff --

Same here: the indentation formatting seems off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102896783
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List topics, 
DeserializationSchema deseri
 * @param props
 *   The properties that are used to configure both the fetcher 
and the offset handler.
 */
-   public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema deserializer, Properties props) {
+   public FlinkKafkaConsumer08(List topics, 
RichKeyedDeserializationSchema deserializer, Properties props) {
--- End diff --

This will break user-code. We'll need proper usage migration here.

We have a separate JIRA that aims at deprecating the current Kafka Consumer 
constructors: https://issues.apache.org/jira/browse/FLINK-5704. The migration 
to use the new flat-map deserialzer can be included there.

Perhaps for this PR, we should just use your 
`RichKeyedDeserializationSchemaWrapper` as "behaviour bridges" for the original 
deserialization schema to the new one, and don't change the original 
constructor / include new constructors yet, so that we don't overlap and 
complicate things for FLINK-5704. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102899179
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
+ * {@see KeyedSerializationSchema}
+ *
+ * @param  The type created by the keyed deserialization schema.
+ */
+public interface RichKeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set)
+* @param message The message, as a byte array. (null if the message 
was empty or deleted)
+* @param partition The partition the message has originated from
+* @param offset the offset of the message in the original source (for 
example the Kafka offset)
+*
+* @return The deserialized message as an object.
+*/
+   void deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset,
+   Collector collector) throws 
IOException;
--- End diff --

The indentation of the parameters here seems a bit off.
Now with the number of parameters to be quite lengthy, it might be a good 
style to have one parameter per line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102896891
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -121,7 +122,7 @@ public FlinkKafkaConsumer010(List topics, 
DeserializationSchema deser
 * @param props
 *   The properties that are used to configure both the fetcher 
and the offset handler.
 */
-   public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema deserializer, Properties props) {
+   public FlinkKafkaConsumer010(List topics, 
RichKeyedDeserializationSchema deserializer, Properties props) {
--- End diff --

Same as in the comment in `FlinkKafkaConsumer08`: this breaks user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102897911
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -422,6 +429,99 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testRichDeserializationSchema() throws Exception {
--- End diff --

I think we should enhance this test to test the behaviour with multiple 
`collect`s per record also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900064
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

Since this is now a very Kafka-specific class, I think this is good timing 
to change to the package path `org.apache.flink.streaming.kafka.serialization` 
now.

The original `KeyedDeserializationSchema` was placed under 
`o.a.f.s.util.serialization` because it was wrongly packaged in another module 
before, and moved to `flink-connector-kafka-base` under the same package path 
to avoid breaking user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881632
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui, if you don't mind, I would also wait for @rmetzger to take another 
look at the new proposals here, before you jump back again into the code.
This part is quite critical for Flink Kafka's exacty-once guarantee, so 
another pair of eyes on this will be safer.

I would also like to do a thorough pass on your code and see if there are 
other problems, so you work on those all-together.

Is that ok for you? Sorry for some more waiting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881264
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

I see what you are saying. The trade off here is handing offs the objects 
another time, but I think it's okay. I'll update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881092
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui hmm this seems a bit odd to me. I think it should be achievable.

```
// the buffer; this can be shared
final List bufferedElements = new LinkedList<>();
// BufferCollector is an implementation of Collector that adds collected 
elements to bufferedElements; this can be shared
final BufferCollector collector = new BufferCollector(bufferedElements);

...

for (final ConsumerRecord record : partitionRecords) {
deserializer.deserialize(
record.key(), record.value(), record.topic(),
record.partition(), record.offset(), collector);

emitRecords(bufferedElements, partitionState, record.offset(), record);

bufferedElements.clear(); // after the elements for the record have 
been emitted, empty out the buffer
}
```

Doesn't this work? I haven't really tried this hands-on, so I might be 
overlooking something. Let me know what you think :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102830609
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Good catch, @tzulitai !

I tried the buffer approach and had no luck. The problem is that calling 
`emitRecord`needs to pass in both the offset and the record itself -- The 
record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer 
itself needs to buffer the deserialized value and the record itself -- it 
cannot solve the problem of having a collector per record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102668004
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

What I think we should do to solve this correctly:

Buffer the elements collected from the `deserialize` call. The 
`Collector.collect` implementation should simply add the collected element to 
the buffer, and not emit it immediately.

After `deserialize` returns, call `emitRecord` once with all the elements 
in the buffer and the original record's offset. This, of course, would mean we 
need to slightly change the `emitRecord` implementation a bit to something like:
```
void emitRecord(List records, KafkaTopicPartitionState 
partitionState, long offset) {
synchronized (checkpointLock) {
for (T record : records) {
sourceContext.collect(record);
}
partitionState.setOffset(offset);
}
}
```

After this, we proceed with the next record and repeat. Note that the 
emitting of all produced elements from record at offset 100L and the update to 
the offset state to 100L happens atomically synchronized on the checkpoint 
lock,  so we can make sure that a checkpoint barrier will only come either 
after or before all the produced records of offset 100, and not in-between.

I think we should also be able to avoid a per-record `Collector` with this 
solution. We can reuse a `Collector` and provide it to the `deserializer` for 
every record, because it's simply only a means to collect elements to the 
internal buffer and we're not calling `emitRecords` in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102665687
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Moving the discussion back a bit:

I don't think this implementation works correctly with exactly-once and how 
we checkpoint the consumer's partition offset state.

The problem is that, in `emitRecord`, we will be updating the offset state. 
In the changes here, what this means is that we will be considering a record to 
have been fully processed as soon as the collector collects something.

For example, lets say the serializer will call `collect` 3 times for 
elements deserialized from record R before `deserialize` returns. R has offset 
100L. As soon as the first element is collected, the state will be updated to 
`finished processing offset 100L`. If now checkpointing is triggered, and we 
use that checkpoint to restore, we will be skipping the remaining 2 elements 
that were yet to be collected.
Once 





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-22 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102542018
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@StephanEwen What is your opinion on solving this problem?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-21 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102299038
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Totally agree. Playing around a little bit and it might require some 
trade-offs here.

The problem is that `emitRecord()` needs the state for each records (e.g., 
topic partition, offset, etc.). The state can be either passed inside a closure 
(like the new instance for the `Collector`) or passed through arguments. I see 
there are three possibilities here:

1. Create a new instance of `Collector` for every record. The JVM may or 
may not be able to optimize it. Trace-based JVM should be able to but I'm not 
sure about classed-based JVM.

2. Expose the internal state in the `collect()` call. The `collect()` call 
takes additional parameters such as offset and partition state. It reduces the 
GC overheads but also hinders changing the implementation.

3. Create a new interface like `Optional deserialize(byte[] messageKey, 
...)` (or
`void deserialize(byte[] messageKey, ..., AtomicReference result)` to 
optimize away the cost of the `Optional` class). It results in a slightly more 
complex APIs but it probably has the best trade-offs between performances and 
API compatibility.

What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-20 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102048656
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
final ConsumerRecords records = 
handover.pollNext();
 
// get the records for each topic partition
-   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   for (final 
KafkaTopicPartitionState partition : subscribedPartitions()) {
 
List> 
partitionRecords =

records.records(partition.getKafkaPartitionHandle());
 
-   for (ConsumerRecord 
record : partitionRecords) {
-   final T value = 
deserializer.deserialize(
-   record.key(), 
record.value(),
-   record.topic(), 
record.partition(), record.offset());
-
-   if 
(deserializer.isEndOfStream(value)) {
-   // end of stream 
signaled
-   running = false;
-   break;
-   }
-
-   // emit the actual record. this 
also updates offset state atomically
-   // and deals with timestamps 
and watermark generation
-   emitRecord(value, partition, 
record.offset(), record);
+   for (final ConsumerRecord record : partitionRecords) {
+   final Collector collector = 
new Collector() {
--- End diff --

Same question as in the Kafka 0.8 impl


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-20 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102048475
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

I'm not sure of the performance implications for this. The JVM will create 
a Collector instance for each record read from Kafka.
I wonder if we can re-use one collector instance here.


Also, I wonder if we need to use this `Collector` implementation, with a 
`close()` method we are not using and an exception we are turning into a 
`RuntimeException`. Maybe we should let the collect throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-14 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3314

[FLINK-3679] DeserializationSchema should handle zero or more outputs

This PR adds a new interface, `RichKeyedDeserializationSchema`, to enable 
the deserializer to produce zero or more outputs. The main use case is that 
skipping corrupted messages in the Kafka stream.

Feedbacks (especially on backward compatibility) are highly appreciated.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-3679

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3314


commit 7728acb3bc00a12a7552706be569710fbfdbd200
Author: Haohui Mai 
Date:   2017-02-14T22:19:29Z

[FLINK-3679] DeserializationSchema should handle zero or more outputs for 
every input.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---