[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2789


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88273352
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -143,133 +123,26 @@ public Kafka09Fetcher(
 
@Override
public void runFetchLoop() throws Exception {
--- End diff --

Ok, I agree to be safe.

Also, I just realized that "end of stream" shouldn't lead to the 
`ClosedException`, only "cancellation", "fetcher error", "consumer error", and 
(hopefully not) any other stuff we overlooked will. So, basically, like what 
you said, only abnormal terminations. In that case, let's keep it this way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88264932
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -143,133 +123,26 @@ public Kafka09Fetcher(
 
@Override
public void runFetchLoop() throws Exception {
--- End diff --

To be safe, I think the `CloseExceptions` should be re-thrown, as should 
all others.
Just for the case when we overlook something and the consumer thread could 
close the handover by itself or so. Any abnormal termination of the fetch loop 
should result in an exception - that is the safest we can do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88264489
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread */
+   private final Handover handover;
+
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+
+   /** The configuration for the Kafka consumer */
+   private final Properties kafkaProperties;
+
+   /** The partitions that this consumer reads from */ 
+   private final KafkaTopicPartitionState[] 
subscribedPartitions;
+
+   /** We get this from the outside to publish metrics. **/
+   private final MetricGroup kafkaMetricGroup;
+
+   /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+   private final KafkaConsumerCallBridge consumerCallBridge;
+
+   /** The maximum number of milliseconds to wait for a fetch batch */
+   private final long pollTimeout;
+
+   /** Flag whether to add Kafka's metrics to the Flink metrics */
+   private final boolean useMetrics;
+
+   /** Reference to the Kafka consumer, once it is created */
+   private volatile KafkaConsumer consumer;
+
+   /** Flag to mark the main work loop as alive */
+   private volatile boolean running;
+
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
+
+   public KafkaConsumerThread(
+   Logger log,
+   Handover handover,
+   Properties kafkaProperties,
+   KafkaTopicPartitionState[] 
subscribedPartitions,
+   MetricGroup kafkaMetricGroup,
 

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88260220
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread */
+   private final Handover handover;
+
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+
+   /** The configuration for the Kafka consumer */
+   private final Properties kafkaProperties;
+
+   /** The partitions that this consumer reads from */ 
+   private final KafkaTopicPartitionState[] 
subscribedPartitions;
+
+   /** We get this from the outside to publish metrics. **/
+   private final MetricGroup kafkaMetricGroup;
+
+   /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+   private final KafkaConsumerCallBridge consumerCallBridge;
+
+   /** The maximum number of milliseconds to wait for a fetch batch */
+   private final long pollTimeout;
+
+   /** Flag whether to add Kafka's metrics to the Flink metrics */
+   private final boolean useMetrics;
+
+   /** Reference to the Kafka consumer, once it is created */
+   private volatile KafkaConsumer consumer;
+
+   /** Flag to mark the main work loop as alive */
+   private volatile boolean running;
+
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
+
+   public KafkaConsumerThread(
+   Logger log,
+   Handover handover,
+   Properties kafkaProperties,
+   KafkaTopicPartitionState[] 
subscribedPartitions,
+   MetricGroup kafkaMetricGroup,
 

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88258273
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 ---
@@ -172,8 +173,14 @@ public void cancel() {
 
// --- add consumer dataflow --
 
+   // the consumer should only poll very small chunks
+   Properties consumerProps = new Properties();
+   consumerProps.putAll(standardProps);
+   consumerProps.putAll(secureProps);
+   consumerProps.setProperty("fetch.message.max.bytes", "100");
--- End diff --

leftover from getting the "short retention" tests to run with the modified 
source. will undo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88257338
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link 
KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes 
against different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+   @Override
+   public void assignPartitions(KafkaConsumer consumer, 
List topicPartitions) throws Exception {
--- End diff --

I think generic is nice, because for this method, the key/value types do 
not matter. That way it is more future proof.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r88245373
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
--- End diff --

I left this package-private, because it is accessed by the nested class for 
the commit callback.
If I make it private, the compiler has to inject a bridge method.

I guess making it private is correct, though, it better documents how it 
should be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87742870
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -279,32 +152,37 @@ public void run() {
break;
}
 
-   // emit the actual record. this 
also update offset state atomically
+   // emit the actual record. this 
also updates offset state atomically
// and deals with timestamps 
and watermark generation
emitRecord(value, partition, 
record.offset(), record);
}
}
}
-   // end main fetch loop
-   }
-   catch (Throwable t) {
-   if (running) {
-   running = false;
-   errorHandler.reportError(t);
-   } else {
-   LOG.debug("Stopped ConsumerThread threw 
exception", t);
-   }
}
finally {
-   try {
-   consumer.close();
-   }
-   catch (Throwable t) {
-   LOG.warn("Error while closing Kafka 0.9 
consumer", t);
-   }
+   // this signals the consumer thread that no more work 
is to be done
+   consumerThread.shutdown();
+   }
+
+   // on a clean exit, wait for the runner thread
+   try {
+   consumerThread.join();
+   }
+   catch (InterruptedException e) {
+   // may be the result of a wake-up interruption after an 
exception.
+   // we ignore this here and only restore the 
interruption state
+   Thread.currentThread().interrupt();
}
}
 
+   @Override
+   public void cancel() {
+   // flag the main thread to exit. A thread interrupt will come 
anyways.
+   running = false;
+   handover.close();
--- End diff --

We might not need to call `close()` on the handover here. Please see my 
above comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87744744
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -323,8 +329,154 @@ else if (partition.topic().equals("another")) {
 
// check that there were no errors in the fetcher
final Throwable caughtError = error.get();
-   if (caughtError != null) {
+   if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
--- End diff --

Perhaps we should be suppressing the fetcher of throwing 
`Handover.ClosedException`, as it doesn't really make sense to the main thread. 
Please see my above comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87738353
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -143,133 +123,26 @@ public Kafka09Fetcher(
 
@Override
public void runFetchLoop() throws Exception {
--- End diff --

We will be throwing all exceptions, even if it's a 
`Handover.ClosedException`, correct?

I wonder if it makes sense to suppress `Handover.ClosedException`s to not 
reach the main task thread, and only restore the interruption state that 
follows `cancel()`? So basically, we catch `InterruptedException` on the whole 
`runFetchLoop()` scope.

This was what the exception passing behaviour was like before. Before, when 
`cancel()` was called on the fetcher, we won't be throwing any other 
exceptions, only restoring the interruption state to the main task thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87744290
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread */
+   private final Handover handover;
+
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+
+   /** The configuration for the Kafka consumer */
+   private final Properties kafkaProperties;
+
+   /** The partitions that this consumer reads from */ 
+   private final KafkaTopicPartitionState[] 
subscribedPartitions;
+
+   /** We get this from the outside to publish metrics. **/
+   private final MetricGroup kafkaMetricGroup;
+
+   /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+   private final KafkaConsumerCallBridge consumerCallBridge;
+
+   /** The maximum number of milliseconds to wait for a fetch batch */
+   private final long pollTimeout;
+
+   /** Flag whether to add Kafka's metrics to the Flink metrics */
+   private final boolean useMetrics;
+
+   /** Reference to the Kafka consumer, once it is created */
+   private volatile KafkaConsumer consumer;
+
+   /** Flag to mark the main work loop as alive */
+   private volatile boolean running;
+
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
+
+   public KafkaConsumerThread(
+   Logger log,
+   Handover handover,
+   Properties kafkaProperties,
+   KafkaTopicPartitionState[] 
subscribedPartitions,
+   MetricGroup kafkaMetricGroup,

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87734223
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link 
KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes 
against different method signatures.
--- End diff --

nit: we need "to" two versions <-- redundant "to".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87735824
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
 ---
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention010ITCase extends 
KafkaShortRetentionTestBase {
--- End diff --

+1 to remove these tests for 0.9+ connectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87734293
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link 
KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes 
against different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+   @Override
+   public void assignPartitions(KafkaConsumer consumer, 
List topicPartitions) throws Exception {
--- End diff --

Does the type parameters for key / value of `KafkaConsumer` need to be 
generic? Seems like we will only be using `` anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87746033
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import 
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the 
fetcher's main thread. 
+ */
+public class HandoverTest {
+
+   // 

+   //  test produce / consumer
+   // 

+
+   @Test
+   public void testWithVariableProducer() throws Exception {
+   runProducerConsumerTest(500, 2, 0);
+   }
+
+   @Test
+   public void testWithVariableConsumer() throws Exception {
+   runProducerConsumerTest(500, 0, 2);
+   }
+
+   @Test
+   public void testWithVariableBoth() throws Exception {
+   runProducerConsumerTest(500, 2, 2);
+   }
+
+   private void runProducerConsumerTest(int numRecords, int 
maxProducerDelay, int maxConsumerDelay) throws Exception {
--- End diff --

nit: Can we move this private method down to the bottom of the file? Not 
entirely necessary, just that I have a preference of keeping private methods 
after the public ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87746504
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import 
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the 
fetcher's main thread. 
+ */
+public class HandoverTest {
+
+   // 

+   //  test produce / consumer
+   // 

+
+   @Test
+   public void testWithVariableProducer() throws Exception {
+   runProducerConsumerTest(500, 2, 0);
+   }
+
+   @Test
+   public void testWithVariableConsumer() throws Exception {
+   runProducerConsumerTest(500, 0, 2);
+   }
+
+   @Test
+   public void testWithVariableBoth() throws Exception {
+   runProducerConsumerTest(500, 2, 2);
+   }
+
+   private void runProducerConsumerTest(int numRecords, int 
maxProducerDelay, int maxConsumerDelay) throws Exception {
+   // generate test data
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   final ConsumerRecords[] data = new 
ConsumerRecords[numRecords];
+   for (int i = 0; i < numRecords; i++) {
+   data[i] = createTestRecords();
+   }
+
+   final Handover handover = new Handover();
+
+   ProducerThread producer = new ProducerThread(handover, data, 
maxProducerDelay);
+   ConsumerThread consumer = new ConsumerThread(handover, data, 
maxConsumerDelay);
+
+   consumer.start();
+   producer.start();
+
+   // sync first on the consumer, so it propagates assertion errors
+   consumer.sync();
+   producer.sync();
+   }
+
+   // 

+   //  test error propagation
+   // 

+
+   @Test
+   public void testPublishErrorOnEmptyHandover() throws Exception {
+   final Handover handover = new Handover();
+
+   Exception error = new Exception();
+   handover.reportError(error);
+
+   try {
+   handover.pollNext();
+   fail("should throw an exception");
+   }
+   catch (Exception e) {
+   assertEquals(error, e);
+   }
+   }
+
+   @Test
+   public void testPublishErrorOnFullHandover() throws Exception {
+   final Handover handover = new Handover();
+   handover.produce(createTestRecords());
+
+   IOException error = new IOException();
+   handover.reportError(error);
+
+   try {
+   handover.pollNext();
+   fail("should throw an exception");
+   }
+   catch (Exception e) {
+   assertEquals(error, e);
+   }
+   }
+
+   @Test
+   public void testExceptionMarksClosedOnEmpty() throws Exception {
+   final Handover handover = new Handover();
+
+   IllegalStateException 

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87738700
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread */
+   private final Handover handover;
+
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+
+   /** The configuration for the Kafka consumer */
+   private final Properties kafkaProperties;
+
+   /** The partitions that this consumer reads from */ 
+   private final KafkaTopicPartitionState[] 
subscribedPartitions;
+
+   /** We get this from the outside to publish metrics. **/
+   private final MetricGroup kafkaMetricGroup;
+
+   /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+   private final KafkaConsumerCallBridge consumerCallBridge;
+
+   /** The maximum number of milliseconds to wait for a fetch batch */
+   private final long pollTimeout;
+
+   /** Flag whether to add Kafka's metrics to the Flink metrics */
+   private final boolean useMetrics;
+
+   /** Reference to the Kafka consumer, once it is created */
+   private volatile KafkaConsumer consumer;
+
+   /** Flag to mark the main work loop as alive */
+   private volatile boolean running;
+
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
+
+   public KafkaConsumerThread(
+   Logger log,
+   Handover handover,
+   Properties kafkaProperties,
+   KafkaTopicPartitionState[] 
subscribedPartitions,
+   MetricGroup kafkaMetricGroup,

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87736264
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
 ---
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention010ITCase extends 
KafkaShortRetentionTestBase {
--- End diff --

I think with this removal, we can also completely remove the 
`runFailOnAutoOffsetResetNone()` from the `KafkaShortRetentionTestBase`.

The 0.8 connector runs `runFailOnAutoOffsetResetNoneEager()` instead of 
`runFailOnAutoOffsetResetNone()`. I think this is what we actually should also 
be doing for 0.9+ connectors, testing only the eager version, because that's a 
Flink-specific behaviour (just pointing this out, we can add this as a separate 
future task as this probably requires some some work on 0.9+).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87745778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 ---
@@ -172,8 +173,14 @@ public void cancel() {
 
// --- add consumer dataflow --
 
+   // the consumer should only poll very small chunks
+   Properties consumerProps = new Properties();
+   consumerProps.putAll(standardProps);
+   consumerProps.putAll(secureProps);
+   consumerProps.setProperty("fetch.message.max.bytes", "100");
--- End diff --

I think we shouldn't be setting `fetch.message.max.bytes` here. The config 
key for this setting has changed across Kafka versions (for 0.9+ it's 
`max.partition.fetch.bytes`). The version-specific `standardProps` already set 
values for this config.

So, the original `props` that only contains `standardProps` and 
`secureProps` should be enough for the test to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87739024
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread */
+   private final Handover handover;
+
+   /** The next offsets that the main thread should commit */
+   private final AtomicReference> 
nextOffsetsToCommit;
+
+   /** The configuration for the Kafka consumer */
+   private final Properties kafkaProperties;
+
+   /** The partitions that this consumer reads from */ 
+   private final KafkaTopicPartitionState[] 
subscribedPartitions;
+
+   /** We get this from the outside to publish metrics. **/
+   private final MetricGroup kafkaMetricGroup;
+
+   /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+   private final KafkaConsumerCallBridge consumerCallBridge;
+
+   /** The maximum number of milliseconds to wait for a fetch batch */
+   private final long pollTimeout;
+
+   /** Flag whether to add Kafka's metrics to the Flink metrics */
+   private final boolean useMetrics;
+
+   /** Reference to the Kafka consumer, once it is created */
+   private volatile KafkaConsumer consumer;
+
+   /** Flag to mark the main work loop as alive */
+   private volatile boolean running;
+
+   /** Flag tracking whether the latest commit request has completed */
+   private volatile boolean commitInProgress;
+
+
+   public KafkaConsumerThread(
+   Logger log,
+   Handover handover,
+   Properties kafkaProperties,
+   KafkaTopicPartitionState[] 
subscribedPartitions,
+   MetricGroup kafkaMetricGroup,

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87745858
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 ---
@@ -122,14 +122,15 @@ public static void shutDownServices() {
 *
 */
private static boolean stopProducer = false;
+
public void runAutoOffsetResetTest() throws Exception {
final String topic = "auto-offset-reset-test";
 
final int parallelism = 1;
final int elementsPerPartition = 5;
 
Properties tprops = new Properties();
-   tprops.setProperty("retention.ms", "250");
+   tprops.setProperty("retention.ms", "100");
--- End diff --

Is this change necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87621040
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the 
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by 
the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * IMPORTANT: This thread must not be interrupted when 
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, 
and to even
+ * deadlock in certain situations.
+ * 
+ * Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+   /** Logger for this consumer */
+   final Logger log;
--- End diff --

Private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87625136
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public final class Handover implements Closeable {
+
+   private final Object lock = new Object();
+
+   private ConsumerRecords next;
+   private Throwable error;
+   private boolean wakeup;
+
+   @Nonnull
+   public ConsumerRecords pollNext() throws Exception {
+   synchronized (lock) {
+   while (next == null && error == null) {
+   lock.wait();
+   }
+
+   ConsumerRecords n = next;
+   if (n != null) {
+   next = null;
+   lock.notifyAll();
+   return n;
+   }
+   else {
+   ExceptionUtils.rethrowException(error, 
error.getMessage());
+
+   // this statement cannot be reached since the 
above method always throws an exception
+   // this is only here to silence the compiler 
and any warnings
+   return ConsumerRecords.empty(); 
+   }
+   }
+   }
+
+   public void produce(final ConsumerRecords element)
+   throws InterruptedException, WakeupException, 
ClosedException {
+
+   checkNotNull(element);
+
+   synchronized (lock) {
+   while (next != null && !wakeup) {
+   lock.wait();
+   }
+
+   wakeup = false;
+
+   // if there is still an element, we must have been 
woken up
+   if (next != null) {
+   throw new WakeupException();
+   }
+   // if there is no error, then this is open and can 
accept this element
+   else if (error == null) {
+   next = element;
+   lock.notifyAll();
+   }
+   // an error marks this as closed for the producer
+   else {
+   throw new ClosedException();
+   }
+   }
+   }
+
+   public void reportError(Throwable t) {
+   checkNotNull(t);
+
+   synchronized (lock) {
+   // do not override the initial exception
+   if (error == null) {
+   error = t;
+   }
+   next = null;
+   lock.notifyAll();
+   }
+   }
+
+   @Override
+   public void close() {
+   synchronized (lock) {
+   next = null;
+   wakeup = false;
+
+   if (error == null) {
+   error = new ClosedException();
+   }
+   lock.notifyAll();
+   }
+   }
+
+   public void wakeupProducer() {
+   synchronized (lock) {
+   wakeup = true;
+   lock.notifyAll();
+   }
+   }
+

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87626228
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -66,36 +57,15 @@
/** The schema to convert between Kafka's byte messages, and Flink's 
objects */
private final KeyedDeserializationSchema deserializer;
 
-   /** The configuration for the Kafka consumer */
-   private final Properties kafkaProperties;
+   /** The handover of data and exceptions between the consumer thread and 
the task thread */
+   private final Handover handover;
 
-   /** The maximum number of milliseconds to wait for a fetch batch */
-   private final long pollTimeout;
-
-   /** The next offsets that the main thread should commit */
-   private final AtomicReference> 
nextOffsetsToCommit;
-   
-   /** The callback invoked by Kafka once an offset commit is complete */
-   private final OffsetCommitCallback offsetCommitCallback;
-
-   /** Reference to the Kafka consumer, once it is created */
-   private volatile KafkaConsumer consumer;
-   
-   /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
-   private volatile ExceptionProxy errorHandler;
+   /** The thread that runs the proper KafkaConsumer and hand the record 
batches to this fetcher */
--- End diff --

nit: 'proper' confused me a  bit at first. Perhaps 'actual'?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87624670
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public final class Handover implements Closeable {
+
+   private final Object lock = new Object();
+
+   private ConsumerRecords next;
+   private Throwable error;
+   private boolean wakeup;
--- End diff --

Can we rename this to perhaps `producerWakeup` ? It'll be less confusing if 
it only affects the producer side of the handover.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87621534
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public final class Handover implements Closeable {
--- End diff --

Would be great if this class has some Javadoc too ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2789#discussion_r87621606
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public final class Handover implements Closeable {
+
+   private final Object lock = new Object();
+
+   private ConsumerRecords next;
+   private Throwable error;
+   private boolean wakeup;
+
+   @Nonnull
+   public ConsumerRecords pollNext() throws Exception {
+   synchronized (lock) {
+   while (next == null && error == null) {
+   lock.wait();
+   }
+
+   ConsumerRecords n = next;
+   if (n != null) {
+   next = null;
+   lock.notifyAll();
+   return n;
+   }
+   else {
+   ExceptionUtils.rethrowException(error, 
error.getMessage());
+
+   // this statement cannot be reached since the 
above method always throws an exception
+   // this is only here to silence the compiler 
and any warnings
+   return ConsumerRecords.empty(); 
+   }
+   }
+   }
+
+   public void produce(final ConsumerRecords element)
+   throws InterruptedException, WakeupException, 
ClosedException {
+
+   checkNotNull(element);
+
+   synchronized (lock) {
+   while (next != null && !wakeup) {
+   lock.wait();
+   }
+
+   wakeup = false;
+
+   // if there is still an element, we must have been 
woken up
+   if (next != null) {
+   throw new WakeupException();
+   }
+   // if there is no error, then this is open and can 
accept this element
+   else if (error == null) {
+   next = element;
+   lock.notifyAll();
+   }
+   // an error marks this as closed for the producer
+   else {
+   throw new ClosedException();
+   }
+   }
+   }
+
+   public void reportError(Throwable t) {
+   checkNotNull(t);
+
+   synchronized (lock) {
+   // do not override the initial exception
+   if (error == null) {
+   error = t;
+   }
+   next = null;
+   lock.notifyAll();
+   }
+   }
+
+   @Override
+   public void close() {
+   synchronized (lock) {
+   next = null;
+   wakeup = false;
+
+   if (error == null) {
+   error = new ClosedException();
+   }
+   lock.notifyAll();
+   }
+   }
+
+   public void wakeupProducer() {
+   synchronized (lock) {
+   wakeup = true;
+   lock.notifyAll();
+   }
+   }
+

[GitHub] flink pull request #2789: [FLINK-5048] [Kafka Consumer] Change thread model ...

2016-11-11 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2789

[FLINK-5048] [Kafka Consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations

**NOTE:** Only the second commit is relevant, the first commit only 
prepares by cleaning up some code in the Flink Kafka Consumers for 0.9 and 0.10

## Rational

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer. That thread was shielded from 
interrupts, because the Kafka Consumer has not been handling thread interrupts 
well.

Since that thread was also the thread that emitted records, it would block 
in the network stack (backpressure) or in chained operators. The later case 
lead to situations where cancellations got very slow unless that thread would 
be interrupted (which it could not be).

## Core changes

This commit changes the thread model:

  - A spawned consumer thread polls a batch or records from the 
KafkaConsumer and pushes the batch of records into a sort of blocking queue
  - The main thread of the task will pull the record batches from the 
blocking queue and emit the records.

The "batches" are the fetch batches from Kafka's consumer, there is no 
additional buffering or so that would impact latency.

The thread-to-thread handover of the records batches is handled by a class 
`Handover` which is a size-one blocking queue with the additional ability to 
gracefully wake up the consumer thread if the main thread decided to shut down. 
That way we need no interrupts on the KafkaConsumerThread.

This also pulls the KafkaConsumerThread out of the fetcher class for some 
code cleanup (scope simplifications).
The method calls that were broken between Kafka 0.9 and 0.10 are handled 
via a "call bridge", which leads to fewer code changes in the fetchers for each 
method that needs to be adapted.

## Tests

This adjusts some tests, but it removes the "short retention IT Cases" for 
Kafka 0.9 and 0.10 consumers.
While that type of test makes sense for the 0.8 consumer, for the newer 
ones the tests actually test purely Kafka and no Flink code.

In addition, they are virtually impossible to run stable and fast, because 
they rely on an artificial slowdown in the KafkaConsumer threads. That type of 
unhealthy interference is exactly what this patch here prevents ;-)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kafka_consumer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2789


commit f6cd417cdf37213f88c62e9342206e249402eac6
Author: Stephan Ewen 
Date:   2016-11-09T16:58:54Z

[hotfix] [Kafka Consumer] Clean up some code confusion and style in the 
Fetchers for Kafka 0.9/0.10

commit 9a0786508b9a13cd986de593c6bdb2ecdb1737a8
Author: Stephan Ewen 
Date:   2016-11-10T10:13:43Z

[FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not 
been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block 
in the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless 
that thread would be interrupted (which it could not be).

This commit changes the thread model:
  - A spawned consumer thread polls a batch or records from the 
KafkaConsumer and pushes the
batch of records into a blocking queue (size one)
  - The main thread of the task will pull the record batches from the 
blocking queue and
emit the records.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---