[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016970#comment-16016970 ] ASF GitHub Bot commented on FLINK-6288: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3901 > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016783#comment-16016783 ] ASF GitHub Bot commented on FLINK-6288: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 @gyfora Thanks for you suggestion, and I agree with you that problems which cause errors to users should be addressed at a higher priority level. Thank you for merging this issues :) @tzulitai > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016028#comment-16016028 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 I'm currently waiting for a Travis run locally for https://github.com/tzulitai/flink/tree/FLINK-6288. That branch also includes more tests and tweak cleanups for the feature. Once it gives green, will merge to `master` and `release-1.3.0`. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015796#comment-16015796 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3901 Thanks @tzulitai I don't feel very strongly about either way, I am just concerned for other users. I leave this decision to you I know you are already flooded with other stuff around the release. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015725#comment-16015725 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 I think you've made a good point Gyula. Alright, let me try to stretch a bit and merge this for 1.3.0 also. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015517#comment-16015517 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3901 On the other hand this is potentially causing major data skew or errors for any people who are using the dynamic topics (they might not even be aware of it). > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015515#comment-16015515 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3901 Well, I can understand it but will mean that we have to keep running with a custom build because there is no way to work around this nicely. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015499#comment-16015499 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 @zjureel I'll be merging this only for 1.4-SNAPSHOT. The main reason is that there is a lot of deprecation / API change with this, and I'll like to ship it along with other API changes we had in mind to make sure they go nicely together. This is of highest priority for at the moment, and I'll make sure it gets into 1.4-SNAPSHOT as soon as possible. @gyfora I understand that this was sort of a blocker for you, but I would prefer to not rush the changes into the release. I hope you can understand. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015411#comment-16015411 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 This looks good! I'll do some final tweaks to the styling of the code and then merge this :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16012079#comment-16012079 ] ASF GitHub Bot commented on FLINK-6288: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 @tzulitai I agree with you and `ExecutorService` is really unnecessary. I will update the code and remove `ExecutorService` > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011756#comment-16011756 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 @zjureel then I would suggest to remove the usage of ExecutorService for the timeout. In what other cases would that be required? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011666#comment-16011666 ] ASF GitHub Bot commented on FLINK-6288: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 Yes, user can config the timeout and retry times to fetch kafka meta beyond the configuration of kafka itself. @tzulitai > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010516#comment-16010516 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 @zjureel my main concern is the need of our own timeout when Kafka has the `max.block.ms` configuration. This seems to be the max timeout you're talking about? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010270#comment-16010270 ] ASF GitHub Bot commented on FLINK-6288: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 @tzulitai I created the new PR here for issue 6288. As discussed in [https://github.com/apache/flink/pull/3766](url), there are two ways to control timeout of fetching kafka meta 1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted. 2. Using `Future` to get the partiton meta of kafka, and user can set the timeout with configuration. In the new api, meta of kafka may be feched before data is sended, and some exceptions such as network or disk problem may still block data send even that user set the request timeout of kafka. So in the PR, I use Future to control the timeout of fetching kafka meta. The problem may not be so complicated, and we can use the timeout mechanism of kafka directly. What do you think of it? Thanks. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010249#comment-16010249 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon closed the pull request at: https://github.com/apache/flink/pull/3766 > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010248#comment-16010248 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai I have created a new PR [https://github.com/apache/flink/pull/3901](url), and I will close this PR, you can review the code there, thanks. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010246#comment-16010246 ] ASF GitHub Bot commented on FLINK-6288: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3901 [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic …voked with number of partitions of default topic Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3901.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3901 commit 301dfc6247645b690b4508abb2245ce5990321c0 Author: zjureelDate: 2017-05-15T09:41:47Z [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010019#comment-16010019 ] ASF GitHub Bot commented on FLINK-6288: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai Thank you for your suggestion, and I think you are right. I will create a new PR from the master and cherry-pick my commits for this issue soon. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010003#comment-16010003 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @zjureel the rebases don't seem to be done correctly. The PR should contain the diff commits only. I'm not sure what went wrong, but perhaps the most easiest way right now is cherry-pick your diff commits on a new branch checkedout from the latest master. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010001#comment-16010001 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @zjureel thanks. You would need a proper rebase: `git rebase master` when you finish your feature branch, instead of merging the latest master. Regarding timeout: doesn't the Kafka client have built-in timeout functionality? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009993#comment-16009993 ] ASF GitHub Bot commented on FLINK-6288: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai Thank you for your replay. For 1, the `ExecutorService` is used to control timeout of fetching kafka partitions. When fetch kafka partitions, a `Future` will be created and executed in `ExecutorService`, which will wait for some mills and throw exception for timeout. For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is `KafkaPartitioner` and add the same constructors with parameter `FlinkKafkaPartitioner`. I find the codes in master of apache/flink were modified relative large some days ago, and I try rebase these modification soon. I think you can review these issues after that, thank you. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009199#comment-16009199 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 seems like the relevant commits are e6ec702 and 64af26e. Let me try to resolve this .. :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009183#comment-16009183 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @zjureel could you rebase the PR on the latest master? Otherwise I cannot review the PR like this. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002317#comment-16002317 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @fanyon I'll finally have some time to get back to this PR this week (perhaps over the next 2 days). Thanks a lot for your patience ... @gyfora I'm personally a +1 to try to get this in the release because it really is a self-contained thing, but I think it'll probably depend on the status of the 1.3 release in the end. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000631#comment-16000631 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai should we try to get this in the release? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994214#comment-15994214 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on a diff in the pull request: https://github.com/apache/flink/pull/3766#discussion_r114466534 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } } + protected int[] getPartitionsByTopic(String topic, KafkaProducerproducer) { + Future future = executor.submit(new PartitionMetaTask(topic, producer)); + + try { + return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException(e); --- End diff -- Yes, retry here will be nicer, I'll fix it, thanks :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992651#comment-15992651 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/3766#discussion_r114282783 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } } + protected int[] getPartitionsByTopic(String topic, KafkaProducerproducer) { + Future future = executor.submit(new PartitionMetaTask(topic, producer)); + + try { + return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException(e); --- End diff -- Should we maybe retry here a few times? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992525#comment-15992525 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 As discussed above, I have updated the API and code. Please review the changes when you are free, and be free to give any suggestions to me, thanks! @gyfora @tzulitai > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986328#comment-15986328 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 I liked the proposed API and I agree that it's probably best to keep the old behaviour for the deprecated API. I don't think the Kafka partition info fetching should be a huge problem as it shouldnt happen too often and Kafka should be able to return the info if you can write to it. We of course need some timeout/retry mechanism to not fail unnecessarily. The producer itself is not very resilient in case of errors in the current state as it can't really handle the async errors it will just shut throw them and fail. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986298#comment-15986298 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai Thank you for your reply. For 2, I think a new issue may be created later. For 1, it is really a problem for which will block the running job. There may be two ways: 1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted. 2. Using Future to get the partiton meta of kafka, and user can set the timeout with configuration. For the 1th way, problem may still exist for network and other reasons, so I'm apt to use the 2ed way. cc @gyfora > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986135#comment-15986135 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 One thing to be careful with, though: Since now we're querying Kafka for partition metadata within the `invoke` method, the query must be handled robustly and make sure it doesn't result in unexpectedly longer checkpoint times by blocking the whole stream at the Kafka sink. Most notably, we need to consider the corner case where Kafka isn't cooperating nicely: 1. how to handle arbitrary long response time for fetching the partition metadata? 2. how to handle the case where, due to some Kafka brokers temporary unavailable, the returned partitions is not complete? For 2., I can also forsee that we have a separate "partitions update thread" that refreshes the `Map` cache continuously at a fixed interval. This can also involve to a `FlinkKafkaPartitioner` that can provide dynamically changing `int[] partitions` when invoking the `partition` method. Perhaps we shouldn't include that with this PR, as its orthogonal to the API change. Just some food for though :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986124#comment-15986124 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 Nice following the discussions here :) Let me wrap up the discussion so far: The old way - ``` interface KafkaPartitioner { void open(int[] partitions, int subtaskIndex, int numSubtasks); int partition(T record, byte[] key, byte[] value, int numPartitions); } ``` The (last) proposed new way - ``` interface FlinkKafkaPartitioner { void open(int subtaskIndex, int numSubtask); int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) } ``` and have an internal cache of partitioner informations: `Map`. The `PartitionerInfo` can actually just be the partition id array, I don't think we need another wrapper class if we just need a single `FlinkKafkaPartitioner` per subtask for all (including dynamic) topics. I like the proposal of the new partitioner, as then users do not need to provide multiple partitioners. Just the question with how well this works for the general use case, because then implementations of the new `partition` method need to handle different topics (which probably makes sense because we want to generally treat topics as dynamic anyways ..). The new way can also allow us to handle upscaled target topics in the future. For migration, for the dummy wrapper delegation, I think we should just mimc the wrong, old behaviour. That was the behaviour it had always been anyways, so we should not try to alter the behaviour if the user is still using the old API. Deprecation and Javadoc message is responsible of pushing them to change to the new API. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985934#comment-15985934 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora For the method `int partition(T next, byte[] serializedKey, byte[] serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition num of target topic can be used. But the KafkaPartitioner's partition id array has been initialized in `void open(int parallelInstanceId, int parallelInstances, int[] partitions)`, which will be executed once, so yes, the problem for dynamic new topics when user uses older KafkaPartitioner API in their older job will still exist, and I find it hard to solve this problem completely. What do you think of this? @tzulitai > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985138#comment-15985138 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 I think this is reasonable as the current implementation doesnt work for dynamic new topics. (we should also deprecate the current one) But let's hear what others say :) > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984528#comment-15984528 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 The new API should in a new class, such as FlinkKafkaPartitioner. For the older KafkaPartitioner implementation, it will be delegated by FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to the current one in terms. Of course, as it is now, default topic's partitions will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984441#comment-15984441 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 How would this new API map to the current one in terms of backwards compatibility? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984402#comment-15984402 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora Thanks for your comment. And right, your question is very good. I originally thought that the user must be sure to know all the output topic when the job is submitted, but in the real business scenario, the data may be output to the dynamically generated topic. For the requirementof generate dynamic topic, I propose to adjust the open and partition api of KafkaPartitioner as follows: 1. The open method, remove the parameter int[] partitions, and will be opend once for each partitioner public void open(int parallelInstanceId, int parallelInstances) 2. The partition method, add int[] partitions and target topic parameters public int partition(T next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) @gyfora @tzulitai What do you think of this? Please feel free to give any suggestions, thanks! > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982630#comment-15982630 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 Hi, Thanks for the PR! The first problem I noticed with this approach is that it will not work if users want to partition dynamically created topics (my use case actually). We should have a default partitioner that could be applied to the unmatched topics and would always pass the correct partition number. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982531#comment-15982531 ] ASF GitHub Bot commented on FLINK-6288: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 Thanks for the PR @fanyon. I'll try to look at the changes soon. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982277#comment-15982277 ] ASF GitHub Bot commented on FLINK-6288: --- GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3766 [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic 1. add extra api addTopicPartitioner, user can use it to add special topic and partitioner 2. add topicPartitionerMap in FlinkKafkaProducerBase to store the topic and partitioner 3. add PartitionerInfo to manage the topic and partitioner info You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3766.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3766 commit a525fe605c25ce2e3c8c30cbc7c60542243c0a18 Author: mengji.fyDate: 2017-04-24T06:16:48Z [FLINK-6288] fix target topic uses partitioner of default topic commit 071e06c00e8a2346d4ebcede8784f1ada5457da2 Author: mengji.fy Date: 2017-04-25T02:03:08Z add serialVersionUID field in PartitionerInfo > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980636#comment-15980636 ] Fang Yong commented on FLINK-6288: -- Thanks for providing the mailing list for this issue, I think this is very helpful for me. As discussed in the mailing list, I'm agree to add a extra API to support the partitioner for each topic. I think a field named MaptopicPartitionerMap should be added in FlinkKafkaProducerBase, where the PartitionerInfo projo contains fields such as topic/partitions. User can use an extra api naned addTopicPartitioner in FlinkKafkaProducerBase to add his special topic and partitioner, all the topic and partitioner in the topicPartitionerMap will be intialed in the open of FlinkKafkaProducerBase. When new data arrives to te sink and the target topic is in the topicPartitionerMap, the topic's special Partitioner whill be used to slice the data. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978308#comment-15978308 ] Tzu-Li (Gordon) Tai commented on FLINK-6288: Sure, please go ahead. And thanks for picking this up! By the way, just to make sure you're up to sync with the whole picture of this JIRA, here's the mailing list thread where the discussion of this issue took place: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Possible-bug-in-Kafka-producer-partitioning-logic-td16972.html. cc [~rmetzger], in case you would like to chime in any early discussions here, as I think this would require a change in our Kafka sink's custom partitioner API (see ML thread for details). > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978304#comment-15978304 ] Fang Yong commented on FLINK-6288: -- Hi [~tzulitai], I'll pick this issue if nobody follows it, thanks. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)