[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-15 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai I have created a new PR 
[https://github.com/apache/flink/pull/3901](url), and I will close this PR, you 
can review the code there, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-15 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your suggestion, and I think you are right. I will 
create a new PR from the master and  cherry-pick my commits for this issue soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel the rebases don't seem to be done correctly. The PR should contain 
the diff commits only.
I'm not sure what went wrong, but perhaps the most easiest way right now is 
cherry-pick your diff commits on a new branch checkedout from the latest master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel thanks. You would need a proper rebase: `git rebase master` when 
you finish your feature branch, instead of merging the latest master.

Regarding timeout: doesn't the Kafka client have built-in timeout 
functionality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-14 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your replay. 

For 1, the `ExecutorService` is used to control timeout of fetching kafka 
partitions. When fetch kafka partitions, a `Future` will be created and 
executed in `ExecutorService`, which will wait for some mills and throw 
exception for timeout.
For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is 
`KafkaPartitioner` and add the same constructors with parameter 
`FlinkKafkaPartitioner`.

I find the codes in master of apache/flink were modified relative large 
some days ago, and I try rebase these modification soon. I think you can review 
these issues after that, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
seems like the relevant commits are e6ec702 and 64af26e.
Let me try to resolve this .. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel could you rebase the PR on the latest master? Otherwise I cannot 
review the PR like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@fanyon I'll finally have some time to get back to this PR this week 
(perhaps over the next 2 days). Thanks a lot for your patience ...

@gyfora I'm personally a +1 to try to get this in the release because it 
really is a self-contained thing, but I think it'll probably depend on the 
status of the 1.3 release in the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-08 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai should we try to get this in the release?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-05-02 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
As discussed above, I have updated the API and code.  Please review the 
changes when you are free, and be free to give any suggestions to me, thanks! 
@gyfora @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-27 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
I liked the proposed API and I agree that it's probably best to keep the 
old behaviour for the deprecated API.

I don't think the Kafka partition info fetching should be a huge problem as 
it shouldnt happen too often and Kafka should be able to return the info if you 
can write to it. We of course need some timeout/retry mechanism to not fail 
unnecessarily.

The producer itself is not very resilient in case of errors in the current 
state as it can't really handle the async errors it will just shut throw them 
and fail. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-27 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your reply. For 2, I think a new issue may be 
created later.

For 1, it is really a problem for which will block the running job.  There 
may be two ways:

1. Depend on the timeout mechanism of kafka. When fetching partition meta 
from kafka, some timeout configurations should be setted.
2. Using Future to get the partiton meta of kafka, and user can set the 
timeout with configuration.

For the 1th way, problem may still exist for network and other reasons, so 
I'm apt to use the 2ed way. 
cc @gyfora 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-27 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
One thing to be careful with, though:
Since now we're querying Kafka for partition metadata within the `invoke` 
method, the query must be handled robustly and make sure it doesn't result in 
unexpectedly longer checkpoint times by blocking the whole stream at the Kafka 
sink.

Most notably, we need to consider the corner case where Kafka isn't 
cooperating nicely:
1. how to handle arbitrary long response time for fetching the partition 
metadata?
2. how to handle the case where, due to some Kafka brokers temporary 
unavailable, the returned partitions is not complete?

For 2., I can also forsee that we have a separate "partitions update 
thread" that refreshes the `Map` cache continuously at a fixed 
interval. This can also involve to a `FlinkKafkaPartitioner` that can provide 
dynamically changing `int[] partitions` when invoking the `partition` method.

Perhaps we shouldn't include that with this PR, as its orthogonal to the 
API change. Just some food for though :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-27 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
Nice following the discussions here :) Let me wrap up the discussion so far:

The old way -
```
interface KafkaPartitioner {
void open(int[] partitions, int subtaskIndex, int numSubtasks);
int partition(T record, byte[] key, byte[] value, int numPartitions);
}
```

The (last) proposed new way -
```
interface FlinkKafkaPartitioner {
void open(int subtaskIndex, int numSubtask);
int partition(T record, byte[] key, byte[] value, String targetTopic, 
int[] partitions)
}
```
and have an internal cache of partitioner informations: `Map`.
The `PartitionerInfo` can actually just be the partition id array, I don't 
think we need another wrapper class if we just need a single 
`FlinkKafkaPartitioner` per subtask for all (including dynamic) topics.

I like the proposal of the new partitioner, as then users do not need to 
provide multiple partitioners. Just the question with how well this works for 
the general use case, because then implementations of the new `partition` 
method need to handle different topics (which probably makes sense because we 
want to generally treat topics as dynamic anyways ..). The new way can also 
allow us to handle upscaled target topics in the future.

For migration, for the dummy wrapper delegation, I think we should just 
mimc the wrong, old behaviour. That was the behaviour it had always been 
anyways, so we should not try to alter the behaviour if the user is still using 
the old API. Deprecation and Javadoc message is responsible of pushing them to 
change to the new API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora For the method `int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition 
num of target topic can be used. But the KafkaPartitioner's partition id array 
has been initialized in `void open(int parallelInstanceId, int 
parallelInstances, int[] partitions)`, which will be executed once, so yes, the 
problem for dynamic new topics when user uses older KafkaPartitioner API in 
their older job will still exist, and I find it hard to solve this problem 
completely.

What do you think of this? @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
I think this is reasonable as the current implementation doesnt work for 
dynamic new topics. (we should also deprecate the current one)

But let's hear what others say :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
The new API should in a new class, such as FlinkKafkaPartitioner. For the 
older KafkaPartitioner implementation, it will be delegated by 
FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains 
defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to 
the current one in terms. Of course, as it is now, default topic's partitions 
will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
How would this new API map to the current one in terms of backwards 
compatibility?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora Thanks for your comment. And right, your question is very good. I 
originally thought that the user must be sure to know all the output topic when 
the job is submitted, but in the real business scenario, the data may be output 
to the dynamically generated topic. 

For the requirementof generate dynamic topic, I propose to adjust the open 
and partition api of KafkaPartitioner as follows:
1. The open method, remove the parameter int[] partitions, and will be 
opend once for each partitioner
public void open(int parallelInstanceId, int parallelInstances)
2. The partition method, add int[] partitions and target topic parameters
public int partition(T next, byte[] serializedKey, byte[] serializedValue, 
String topic, int[] partitions)

@gyfora @tzulitai What do you think of this? Please feel free to give any 
suggestions, thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-25 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
Hi,
Thanks for the PR!

The first problem I noticed with this approach is that it will not work if 
users want to partition dynamically created topics (my use case actually). 

We should have a default partitioner that could be applied to the unmatched 
topics and would always pass the correct partition number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-25 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
Thanks for the PR @fanyon. I'll try to look at the changes soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---