[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334223#comment-17334223 ] Flink Jira Bot commented on FLINK-7913: --- This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Major > Labels: stale-assigned > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323605#comment-17323605 ] Flink Jira Bot commented on FLINK-7913: --- This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Major > Labels: stale-assigned > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383267#comment-16383267 ] Tzu-Li (Gordon) Tai commented on FLINK-7913: Yes, moving to 1.6.0. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Blocker > Fix For: 1.6.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16382135#comment-16382135 ] Aljoscha Krettek commented on FLINK-7913: - [~tzulitai] Did we decide to move this to 1.6.0? Or at least make it non-blocking? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Blocker > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289884#comment-16289884 ] Konstantin Lalafaryan commented on FLINK-7913: -- [~tzulitai] So it means in the scope of this ticket we shouldn't change anything, right ? And also I have checked the pull request and seems like the key value has been ignored, is that correct ? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Blocker > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286782#comment-16286782 ] Tzu-Li (Gordon) Tai commented on FLINK-7913: As a side note for clarification: I think the default paritioning behaviour in Kafka is round-robin only if there is no key attached to written records, otherwise hash partitioning is used. See: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Blocker > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286799#comment-16286799 ] Tzu-Li (Gordon) Tai commented on FLINK-7913: We should probably take that into account, and respect that the returned key from `KeyedSerializationSchema` can actually be null. That way we'll be able to cover both hash partitioning and round-robin partitioning (which is the Kafka default). We only ever write to a specific partition iff a custom partitioner is provided by the user. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan >Priority: Blocker > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286705#comment-16286705 ] Tzu-Li (Gordon) Tai commented on FLINK-7913: [~aljoscha] [~klalafaryan] Yes. I think we should also do it now along with the constructors overhaul that is coming up for the Kafka consumer / producer. That would give users a smoother / predictable migration to the new behaviour. Related JIRAs are FLINK-8191 (adding the round-robin partitioner) and FLINK-5728 (which suggests new constructors for the producer). > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286706#comment-16286706 ] Tzu-Li (Gordon) Tai commented on FLINK-7913: This issue seems to troubling quite a few users. I'll elevate the above mentioned issues to BLOCKER in case they slip through the cracks again for 1.5.0 > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286189#comment-16286189 ] Aljoscha Krettek commented on FLINK-7913: - [~tzulitai] I think you also already thought about changing this, right? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16285816#comment-16285816 ] Konstantin Lalafaryan commented on FLINK-7913: -- - Kafka uses round-robin partitioner - flink uses parallel instance id to decide the target partition. So let's say you have 12 kafka partitions and only two flink partitions in this case only two kafka partitions will be used. You can check the logic here: org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262932#comment-16262932 ] Aljoscha Krettek commented on FLINK-7913: - What is the default Kafka partitioner doing different from the default Flink Kafka partitioner? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262244#comment-16262244 ] Konstantin Lalafaryan commented on FLINK-7913: -- [~aljoscha], [~tzulitai] do you have an update ? Thanks. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224747#comment-16224747 ] Aljoscha Krettek commented on FLINK-7913: - That seems weird, I think we should probably use the default Kafka (from Kafka) partitioner if nothing is specified. What do you think, [~tzulitai]? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223340#comment-16223340 ] Konstantin Lalafaryan commented on FLINK-7913: -- [~aljoscha] What do you think about it ? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218776#comment-16218776 ] Konstantin Lalafaryan commented on FLINK-7913: -- Thanks for your comment. Yes, you are right. But I have just found out that you can use the Kafka's default partitioner by doing following: {code:java} outputStream.addSink(new FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC), new EventSerializationSchema(),producerProperties, null)); {code} Basically you have to pass null value for customPartitioner. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218556#comment-16218556 ] Aljoscha Krettek commented on FLINK-7913: - This issue aims at using vanilla Kafka partitioners with the {{FlinkKafkaProducers}}, right? If yes, I'm very much in favour of this. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)