[jira] [Commented] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

2023-05-14 Thread Liam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722618#comment-17722618
 ] 

Liam commented on FLINK-31762:
--

Hi master [~martijnvisser] [~tzulitai] any comment?

> Subscribe to multiple Kafka topics may cause partition assignment skew
> --
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.18.0
>Reporter: Liam
>Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png, 
> image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and 
> each topic has four partitions. We have set the parallelism to eight to 
> consume these two topics. However, the current partition assignment method 
> may lead to some subtasks being assigned two partitions while others are left 
> with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100 
> partitions. If I set the parallelism to 1000, some slots may be assigned 
> seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this 
> approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented 
> below:
>  
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>  
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {    
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         return assign(partition.getTopic(), partition.getPartition(), 
> numParallelSubtasks);
>     }    public static int assign(String topic, int partition, int 
> numParallelSubtasks) {
>         int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
> numParallelSubtasks;        // here, the assumption is that the id of Kafka 
> partitions are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition) % numParallelSubtasks;
>     }
>  {code}
> for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
> {code:java}
>     static int getSplitOwner(TopicPartition tp, int numReaders) {
>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) % 
> numReaders;        // here, the assumption is that the id of Kafka partitions 
> are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + tp.partition()) % numReaders;
>     } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

2023-04-11 Thread Liam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liam updated FLINK-31762:
-
Description: 
To simplify the demonstration, let us assume that there are two topics, and 
each topic has four partitions. We have set the parallelism to eight to consume 
these two topics. However, the current partition assignment method may lead to 
some subtasks being assigned two partitions while others are left with none.

!image-2023-04-11-08-00-16-054.png|width=500,height=143!

In my case, the situation is even worse as I have ten topics, each with 100 
partitions. If I set the parallelism to 1000, some slots may be assigned seven 
partitions while others remain unassigned.

To address this issue, I propose a new partition assignment solution. In this 
approach, round-robin assignment takes place between all topics, not just one.

For example, the ideal assignment for the case mentioned above is presented 
below:

 

!https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!

This new solution can also handle cases where each topic has more partitions.

!image-2023-04-11-08-12-24-115.png|width=444,height=127!

Let us work together to reach a consensus on this proposal. Thank you!

 

FYI: how the partition be assigned currently
{code:java}
public class KafkaTopicPartitionAssigner {    
    public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
        return assign(partition.getTopic(), partition.getPartition(), 
numParallelSubtasks);
    }    public static int assign(String topic, int partition, int 
numParallelSubtasks) {
        int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
numParallelSubtasks;        // here, the assumption is that the id of Kafka 
partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + partition) % numParallelSubtasks;
    }
 {code}
for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
{code:java}
    static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) % 
numReaders;        // here, the assumption is that the id of Kafka partitions 
are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + tp.partition()) % numReaders;
    } {code}
 

 

  was:
To simplify the demonstration, let us assume that there are two topics, and 
each topic has four partitions. We have set the parallelism to eight to consume 
these two topics. However, the current partition assignment method may lead to 
some subtasks being assigned two partitions while others are left with none.

!image-2023-04-11-08-00-16-054.png|width=500,height=143!

In my case, the situation is even worse as I have ten topics, each with 100 
partitions. If I set the parallelism to 1000, some slots may be assigned seven 
partitions while others remain unassigned.

To address this issue, I propose a new partition assignment solution. In this 
approach, round-robin assignment takes place between all topics, not just one.

For example, the ideal assignment for the case mentioned above is presented 
below:

 

!https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!

This new solution can also handle cases where each topic has more partitions.

!image-2023-04-11-08-12-24-115.png|width=444,height=127!

Let us work together to reach a consensus on this proposal. Thank you!

 

FYI: how the partition be assigned currently
{code:java}
public class KafkaTopicPartitionAssigner {    
    public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
        return assign(partition.getTopic(), partition.getPartition(), 
numParallelSubtasks);
    }    public static int assign(String topic, int partition, int 
numParallelSubtasks) {
        int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
numParallelSubtasks;        // here, the assumption is that the id of Kafka 
partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + partition) % numParallelSubtasks;
    }
 {code}
 

 


> Subscribe to multiple Kafka topics may cause partition assignment skew
> --
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.18.0
>Reporter: Liam
>Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png, 
> 

[jira] [Updated] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

2023-04-10 Thread Liam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liam updated FLINK-31762:
-
Summary: Subscribe to multiple Kafka topics may cause partition assignment 
skew  (was: Subscribe multiple Kafka topics may cause partition assignment skew)

> Subscribe to multiple Kafka topics may cause partition assignment skew
> --
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.18.0
>Reporter: Liam
>Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png, 
> image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and 
> each topic has four partitions. We have set the parallelism to eight to 
> consume these two topics. However, the current partition assignment method 
> may lead to some subtasks being assigned two partitions while others are left 
> with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100 
> partitions. If I set the parallelism to 1000, some slots may be assigned 
> seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this 
> approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented 
> below:
>  
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>  
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {    
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         return assign(partition.getTopic(), partition.getPartition(), 
> numParallelSubtasks);
>     }    public static int assign(String topic, int partition, int 
> numParallelSubtasks) {
>         int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
> numParallelSubtasks;        // here, the assumption is that the id of Kafka 
> partitions are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition) % numParallelSubtasks;
>     }
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31762) Subscribe multiple Kafka topics may cause partition assignment skew

2023-04-10 Thread Liam (Jira)
Liam created FLINK-31762:


 Summary: Subscribe multiple Kafka topics may cause partition 
assignment skew
 Key: FLINK-31762
 URL: https://issues.apache.org/jira/browse/FLINK-31762
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.13.0, 1.18.0
Reporter: Liam
 Attachments: image-2023-04-11-08-00-16-054.png, 
image-2023-04-11-08-12-24-115.png

To simplify the demonstration, let us assume that there are two topics, and 
each topic has four partitions. We have set the parallelism to eight to consume 
these two topics. However, the current partition assignment method may lead to 
some subtasks being assigned two partitions while others are left with none.

!image-2023-04-11-08-00-16-054.png|width=500,height=143!

In my case, the situation is even worse as I have ten topics, each with 100 
partitions. If I set the parallelism to 1000, some slots may be assigned seven 
partitions while others remain unassigned.

To address this issue, I propose a new partition assignment solution. In this 
approach, round-robin assignment takes place between all topics, not just one.

For example, the ideal assignment for the case mentioned above is presented 
below:

 

!https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!

This new solution can also handle cases where each topic has more partitions.

!image-2023-04-11-08-12-24-115.png|width=444,height=127!

Let us work together to reach a consensus on this proposal. Thank you!

 

FYI: how the partition be assigned currently
{code:java}
public class KafkaTopicPartitionAssigner {    
    public static int assign(KafkaTopicPartition partition, int 
numParallelSubtasks) {
        return assign(partition.getTopic(), partition.getPartition(), 
numParallelSubtasks);
    }    public static int assign(String topic, int partition, int 
numParallelSubtasks) {
        int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
numParallelSubtasks;        // here, the assumption is that the id of Kafka 
partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset 
clockwise from the
        // start index
        return (startIndex + partition) % numParallelSubtasks;
    }
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27463) The JDBC connector should support option for ‘upsert’ or ‘insert’

2023-04-10 Thread Liam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liam closed FLINK-27463.

Resolution: Won't Do

> The JDBC connector should support option for ‘upsert’ or ‘insert’
> -
>
> Key: FLINK-27463
> URL: https://issues.apache.org/jira/browse/FLINK-27463
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.14.4
>Reporter: Liam
>Priority: Minor
>
> If set a primary key to the table, then the connector will use the upsert 
> query to sink data.
> However, the upsert query is not always expected. Because some of the 
> databases don't support upsert well, like Click House. (even though some 
> databases are not supporting officially, I believe many users are extending 
> the dialect by themself)
> So I suggest adding an option like 'sink.append-only' to make the data 
> populated by insert query.
>  
> If we can reach a consensus, I can implement it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27463) The JDBC connector should support option for ‘upsert’ or ‘insert’

2022-05-01 Thread Liam (Jira)
Liam created FLINK-27463:


 Summary: The JDBC connector should support option for ‘upsert’ or 
‘insert’
 Key: FLINK-27463
 URL: https://issues.apache.org/jira/browse/FLINK-27463
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.14.4
Reporter: Liam


If set a primary key to the table, then the connector will use the upsert query 
to sink data.

However, the upsert query is not always expected. Because some of the databases 
don't support upsert well, like Click House. (even though some databases are 
not supporting officially, I believe many users are extending the dialect by 
themself)

So I suggest adding an option like 'sink.append-only' to make the data 
populated by insert query.

 

If we can reach a consensus, I can implement it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)