[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190530647
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -240,7 +249,9 @@ public void run() {
newPartitions = 
unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
-   
reassignPartitions(newPartitions);
+   
reassignPartitions(newPartitions, new HashSet<>());
--- End diff --

I just realized this should be actually
`reassignPartitions(newPartitions, partitionsToBeRemoved);`


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190529981
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I though about it, but my only concern is the case where we'd have both, 
partitions to add and partitions to remove...  
the `consumerCallBridge.assignPartitions()` takes the whole new list of 
partitions, so in that case, we would need to wait for the first assignment 
(e.g. add new partitions) before doing the second assignment (e.g. remove 
partitions) in order to have a consistent list of partitions. 
I think we would try to have only one call to 
`consumerCallBridge.assignPartitions()`.

Maybe I could refactor the part where partitions are removed from old 
partitions to a separate private method like `removeFromOldPartitions()` ?

What do you think ?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190519624
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

I did it in that way only because this is something new, so I though that 
maybe you may want it to be configurable. But you are right I cannot think of a 
case we would prefer to keep the unavailable partitions.
I'll update the PR to make it the default behaviour if it's ok for you.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190518142
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

From my understanding, for unassigned partitions we can use a Queue because 
it does not matter which consumer will take the new partitions.
But we can not use a Queue for partitions to be removed because we only can 
remove the partitions from the consumer that is actually subscribed to that 
partition.
Does that make sense ?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190153528
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

Would it actually make more sense that we have a queue for this? Like how 
we are handling unassigned new partitions via the `unassignedPartitionsQueue`. 
The fact that this is a set means that we will need to eventually remove 
entries from it anyways.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190150419
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

Why do we want this to be configurable? Is there any case that we would 
prefer to leave them untouched?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190152570
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I have the feeling that this method is way too complex now, to a point that 
it might make more sense to break this up into 2 different methods - 
`addPartitionsToAssignment` and `removePartitionsFromAssignment`.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-16 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r188596584
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +82,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final List partitionsToBeRemoved;
--- End diff --

You are right, a Set should be better for all the calls to the `contains()` 
method. 


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-16 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r188564095
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),

KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
-   !getBoolean(props, KEY_DISABLE_METRICS, false));
+   !getBoolean(props, KEY_DISABLE_METRICS, false),
+   getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, 
false));
--- End diff --

You're right, I'll change it


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-11 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r187647275
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),

KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
-   !getBoolean(props, KEY_DISABLE_METRICS, false));
+   !getBoolean(props, KEY_DISABLE_METRICS, false),
+   getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, 
false));
--- End diff --

Should this be named KEY_CHECK_UNAVAILABLE_PARTITIONS ?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-11 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r187648458
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +384,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List 
newPartitions, List partitionsToBeRemoved) throws Exception {
+   if (newPartitions.size() == 0 && 
partitionsToBeRemoved.isEmpty()) {
--- End diff --

size() -> ! isEmpty


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-11 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r187647828
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +82,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final List partitionsToBeRemoved;
--- End diff --

Should this be Set to facilitate fast lookup ?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-11 Thread EAlexRojas
GitHub user EAlexRojas opened a pull request:

https://github.com/apache/flink/pull/5991

[FLINK-9303] [kafka] Adding support for unassign dynamically partitions 
from kafka consumer when they become unavailable

## What is the purpose of the change

This pull request add an option on the kafka consumer to check for 
unavailable partitions and unassign them from the consumer. That way the 
consumer does not request for records on invalid partitions and prevent Logs 
noises.

## Brief change log

- Modify the partition discovery system to check not only new partitions, 
but also check partitions that are no longer available.
- Check for partitions no longer available recovered from state.
- Add option on kafka consumer to activate this checks

## Verifying this change

This change added tests and can be verified as follows:
*Manually verified as follows:*
- Create a job with a kafka consumer listening to a topic pattern and 
having partition discovery activated and the property introduced in this PR set 
to true.
- Configure Kafka to have set the following properties: 
   delete.topic.enable=true
   auto.create.topics.enable=false
- Create some topics matching the pattern.
- Run the job.
-  While running, remove some of the topics. 
- Verify the partitions are unassigned and the job continue running without 
Log noises.

*I guess this can be tested with e2e tests, but I'm not familiarised with 
the system in place* 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EAlexRojas/flink kafka-unassign-partitions-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5991


commit a17d0dcdeaac5b2508f4748d08fd4cb879fa5033
Author: EAlexRojas 
Date:   2018-04-18T14:35:57Z

[FLINK-9303] [kafka] Adding support for unassign dynamically partitions 
from kafka consumer when they become unavailable
- Check for unavailable partitions recovered from state
- Using kafka consumer option to activate this validations




---