[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016970#comment-16016970
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3901


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016783#comment-16016783
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3901
  
@gyfora Thanks for you suggestion, and I agree with you that problems which 
cause errors to users should be addressed at a higher priority level. 

Thank you for merging this issues :) @tzulitai 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016028#comment-16016028
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3901
  
I'm currently waiting for a Travis run locally for 
https://github.com/tzulitai/flink/tree/FLINK-6288.
That branch also includes more tests and tweak cleanups for the feature.

Once it gives green, will merge to `master` and `release-1.3.0`.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015796#comment-16015796
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3901
  
Thanks @tzulitai I don't feel very strongly about either way, I am just 
concerned for other users. I leave this decision to you I know you are already 
flooded with other stuff around the release.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015725#comment-16015725
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3901
  
I think you've made a good point Gyula.
Alright, let me try to stretch a bit and merge this for 1.3.0 also.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015517#comment-16015517
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3901
  
On the other hand this is potentially causing major data skew or errors for 
any people who are using the dynamic topics (they might not even be aware of 
it).


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015515#comment-16015515
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3901
  
Well, I can understand it but will mean that we have to keep running with a 
custom build because there is no way to work around this nicely.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015499#comment-16015499
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3901
  
@zjureel I'll be merging this only for 1.4-SNAPSHOT. The main reason is 
that there is a lot of deprecation / API change with this, and I'll like to 
ship it along with other API changes we had in mind to make sure they go nicely 
together.

This is of highest priority for at the moment, and I'll make sure it gets 
into 1.4-SNAPSHOT as soon as possible.

@gyfora I understand that this was sort of a blocker for you, but I would 
prefer to not rush the changes into the release. I hope you can understand.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015411#comment-16015411
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3901
  
This looks good! I'll do some final tweaks to the styling of the code and 
then merge this :)


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16012079#comment-16012079
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3901
  
@tzulitai I agree with you and `ExecutorService` is really unnecessary. I 
will update the code and remove `ExecutorService`


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011756#comment-16011756
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3901
  
@zjureel then I would suggest to remove the usage of ExecutorService for 
the timeout. In what other cases would that be required?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011666#comment-16011666
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3901
  
Yes, user can config the timeout and retry times to fetch kafka meta beyond 
the configuration of kafka itself.  @tzulitai 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010516#comment-16010516
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3901
  
@zjureel my main concern is the need of our own timeout when Kafka has the 
`max.block.ms` configuration. This seems to be the max timeout you're talking 
about?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010270#comment-16010270
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3901
  
@tzulitai I created the new PR here for issue 6288. As discussed in 
[https://github.com/apache/flink/pull/3766](url), there are two ways to control 
timeout of fetching kafka meta

1. Depend on the timeout mechanism of kafka. When fetching partition meta 
from kafka, some timeout configurations should be setted.
2. Using `Future` to get the partiton meta of kafka, and user can set the 
timeout with configuration.

In the new api, meta of kafka may be feched before data is sended, and some 
exceptions such as network or disk problem may still block data send even that 
user set the request timeout of kafka. So in the PR, I use Future to control 
the timeout of fetching kafka meta.

The problem may not be so complicated, and we can use the timeout mechanism 
of kafka directly. What do you think of it? Thanks.




> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010249#comment-16010249
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon closed the pull request at:

https://github.com/apache/flink/pull/3766


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010248#comment-16010248
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai I have created a new PR 
[https://github.com/apache/flink/pull/3901](url), and I will close this PR, you 
can review the code there, thanks.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010246#comment-16010246
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3901

[FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked 
with number of partitions of default topic

…voked with number of partitions of default topic

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3901.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3901


commit 301dfc6247645b690b4508abb2245ce5990321c0
Author: zjureel 
Date:   2017-05-15T09:41:47Z

[FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked 
with number of partitions of default topic




> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010019#comment-16010019
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your suggestion, and I think you are right. I will 
create a new PR from the master and  cherry-pick my commits for this issue soon.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010003#comment-16010003
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel the rebases don't seem to be done correctly. The PR should contain 
the diff commits only.
I'm not sure what went wrong, but perhaps the most easiest way right now is 
cherry-pick your diff commits on a new branch checkedout from the latest master.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010001#comment-16010001
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel thanks. You would need a proper rebase: `git rebase master` when 
you finish your feature branch, instead of merging the latest master.

Regarding timeout: doesn't the Kafka client have built-in timeout 
functionality?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009993#comment-16009993
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your replay. 

For 1, the `ExecutorService` is used to control timeout of fetching kafka 
partitions. When fetch kafka partitions, a `Future` will be created and 
executed in `ExecutorService`, which will wait for some mills and throw 
exception for timeout.
For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is 
`KafkaPartitioner` and add the same constructors with parameter 
`FlinkKafkaPartitioner`.

I find the codes in master of apache/flink were modified relative large 
some days ago, and I try rebase these modification soon. I think you can review 
these issues after that, thank you.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009199#comment-16009199
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
seems like the relevant commits are e6ec702 and 64af26e.
Let me try to resolve this .. :)


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009183#comment-16009183
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@zjureel could you rebase the PR on the latest master? Otherwise I cannot 
review the PR like this.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002317#comment-16002317
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
@fanyon I'll finally have some time to get back to this PR this week 
(perhaps over the next 2 days). Thanks a lot for your patience ...

@gyfora I'm personally a +1 to try to get this in the release because it 
really is a self-contained thing, but I think it'll probably depend on the 
status of the 1.3 release in the end.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000631#comment-16000631
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai should we try to get this in the release?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994214#comment-15994214
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on a diff in the pull request:

https://github.com/apache/flink/pull/3766#discussion_r114466534
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
}
}
 
+   protected int[] getPartitionsByTopic(String topic, 
KafkaProducer producer) {
+   Future future = executor.submit(new 
PartitionMetaTask(topic, producer));
+
+   try {
+   return future.get(kafkaMetaTimeoutMs, 
TimeUnit.MILLISECONDS);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
--- End diff --

Yes, retry here will be nicer, I'll fix it, thanks :)


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992651#comment-15992651
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/3766#discussion_r114282783
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
}
}
 
+   protected int[] getPartitionsByTopic(String topic, 
KafkaProducer producer) {
+   Future future = executor.submit(new 
PartitionMetaTask(topic, producer));
+
+   try {
+   return future.get(kafkaMetaTimeoutMs, 
TimeUnit.MILLISECONDS);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
--- End diff --

Should we maybe retry here a few times?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992525#comment-15992525
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
As discussed above, I have updated the API and code.  Please review the 
changes when you are free, and be free to give any suggestions to me, thanks! 
@gyfora @tzulitai 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986328#comment-15986328
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
I liked the proposed API and I agree that it's probably best to keep the 
old behaviour for the deprecated API.

I don't think the Kafka partition info fetching should be a huge problem as 
it shouldnt happen too often and Kafka should be able to return the info if you 
can write to it. We of course need some timeout/retry mechanism to not fail 
unnecessarily.

The producer itself is not very resilient in case of errors in the current 
state as it can't really handle the async errors it will just shut throw them 
and fail. 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986298#comment-15986298
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@tzulitai Thank you for your reply. For 2, I think a new issue may be 
created later.

For 1, it is really a problem for which will block the running job.  There 
may be two ways:

1. Depend on the timeout mechanism of kafka. When fetching partition meta 
from kafka, some timeout configurations should be setted.
2. Using Future to get the partiton meta of kafka, and user can set the 
timeout with configuration.

For the 1th way, problem may still exist for network and other reasons, so 
I'm apt to use the 2ed way. 
cc @gyfora 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986135#comment-15986135
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
One thing to be careful with, though:
Since now we're querying Kafka for partition metadata within the `invoke` 
method, the query must be handled robustly and make sure it doesn't result in 
unexpectedly longer checkpoint times by blocking the whole stream at the Kafka 
sink.

Most notably, we need to consider the corner case where Kafka isn't 
cooperating nicely:
1. how to handle arbitrary long response time for fetching the partition 
metadata?
2. how to handle the case where, due to some Kafka brokers temporary 
unavailable, the returned partitions is not complete?

For 2., I can also forsee that we have a separate "partitions update 
thread" that refreshes the `Map` cache continuously at a fixed 
interval. This can also involve to a `FlinkKafkaPartitioner` that can provide 
dynamically changing `int[] partitions` when invoking the `partition` method.

Perhaps we shouldn't include that with this PR, as its orthogonal to the 
API change. Just some food for though :)


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986124#comment-15986124
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
Nice following the discussions here :) Let me wrap up the discussion so far:

The old way -
```
interface KafkaPartitioner {
void open(int[] partitions, int subtaskIndex, int numSubtasks);
int partition(T record, byte[] key, byte[] value, int numPartitions);
}
```

The (last) proposed new way -
```
interface FlinkKafkaPartitioner {
void open(int subtaskIndex, int numSubtask);
int partition(T record, byte[] key, byte[] value, String targetTopic, 
int[] partitions)
}
```
and have an internal cache of partitioner informations: `Map`.
The `PartitionerInfo` can actually just be the partition id array, I don't 
think we need another wrapper class if we just need a single 
`FlinkKafkaPartitioner` per subtask for all (including dynamic) topics.

I like the proposal of the new partitioner, as then users do not need to 
provide multiple partitioners. Just the question with how well this works for 
the general use case, because then implementations of the new `partition` 
method need to handle different topics (which probably makes sense because we 
want to generally treat topics as dynamic anyways ..). The new way can also 
allow us to handle upscaled target topics in the future.

For migration, for the dummy wrapper delegation, I think we should just 
mimc the wrong, old behaviour. That was the behaviour it had always been 
anyways, so we should not try to alter the behaviour if the user is still using 
the old API. Deprecation and Javadoc message is responsible of pushing them to 
change to the new API.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985934#comment-15985934
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora For the method `int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition 
num of target topic can be used. But the KafkaPartitioner's partition id array 
has been initialized in `void open(int parallelInstanceId, int 
parallelInstances, int[] partitions)`, which will be executed once, so yes, the 
problem for dynamic new topics when user uses older KafkaPartitioner API in 
their older job will still exist, and I find it hard to solve this problem 
completely.

What do you think of this? @tzulitai 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985138#comment-15985138
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
I think this is reasonable as the current implementation doesnt work for 
dynamic new topics. (we should also deprecate the current one)

But let's hear what others say :)


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984528#comment-15984528
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
The new API should in a new class, such as FlinkKafkaPartitioner. For the 
older KafkaPartitioner implementation, it will be delegated by 
FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains 
defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to 
the current one in terms. Of course, as it is now, default topic's partitions 
will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984441#comment-15984441
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
How would this new API map to the current one in terms of backwards 
compatibility?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984402#comment-15984402
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora Thanks for your comment. And right, your question is very good. I 
originally thought that the user must be sure to know all the output topic when 
the job is submitted, but in the real business scenario, the data may be output 
to the dynamically generated topic. 

For the requirementof generate dynamic topic, I propose to adjust the open 
and partition api of KafkaPartitioner as follows:
1. The open method, remove the parameter int[] partitions, and will be 
opend once for each partitioner
public void open(int parallelInstanceId, int parallelInstances)
2. The partition method, add int[] partitions and target topic parameters
public int partition(T next, byte[] serializedKey, byte[] serializedValue, 
String topic, int[] partitions)

@gyfora @tzulitai What do you think of this? Please feel free to give any 
suggestions, thanks!



> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982630#comment-15982630
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
Hi,
Thanks for the PR!

The first problem I noticed with this approach is that it will not work if 
users want to partition dynamically created topics (my use case actually). 

We should have a default partitioner that could be applied to the unmatched 
topics and would always pass the correct partition number.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982531#comment-15982531
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3766
  
Thanks for the PR @fanyon. I'll try to look at the changes soon.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982277#comment-15982277
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

GitHub user fanyon opened a pull request:

https://github.com/apache/flink/pull/3766

[FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked 
with number of partitions of default topic

1. add extra api addTopicPartitioner, user can use it to add special topic 
and partitioner
2. add topicPartitionerMap in FlinkKafkaProducerBase to store the topic and 
partitioner
3. add PartitionerInfo to manage the topic and partitioner info


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fanyon/flink FLINK-6288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3766.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3766


commit a525fe605c25ce2e3c8c30cbc7c60542243c0a18
Author: mengji.fy 
Date:   2017-04-24T06:16:48Z

[FLINK-6288] fix target topic uses partitioner of default topic

commit 071e06c00e8a2346d4ebcede8784f1ada5457da2
Author: mengji.fy 
Date:   2017-04-25T02:03:08Z

add serialVersionUID field in PartitionerInfo




> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-23 Thread Fang Yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980636#comment-15980636
 ] 

Fang Yong commented on FLINK-6288:
--

Thanks for providing the mailing list for this issue, I think this is very 
helpful for me. As discussed in the mailing list, I'm agree to add a extra API 
to support the partitioner for each topic. I think a field named Map topicPartitionerMap should be added in FlinkKafkaProducerBase, 
where the PartitionerInfo projo contains fields such as topic/partitions. 
User can use an extra api naned addTopicPartitioner in FlinkKafkaProducerBase 
to add his special topic and partitioner, all the topic and partitioner in the 
topicPartitionerMap will be intialed in the open of FlinkKafkaProducerBase. 
When new data arrives to te sink and the target topic is in the 
topicPartitionerMap, the topic's special Partitioner whill be used to slice the 
data.

> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-21 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978308#comment-15978308
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6288:


Sure, please go ahead. And thanks for picking this up!

By the way, just to make sure you're up to sync with the whole picture of this 
JIRA, here's the mailing list thread where the discussion of this issue took 
place: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Possible-bug-in-Kafka-producer-partitioning-logic-td16972.html.

cc [~rmetzger], in case you would like to chime in any early discussions here, 
as I think this would require a change in our Kafka sink's custom partitioner 
API (see ML thread for details).

> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-21 Thread Fang Yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978304#comment-15978304
 ] 

Fang Yong commented on FLINK-6288:
--

Hi [~tzulitai], I'll pick this issue if nobody follows it, thanks.

> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)