Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-09 Thread Jeyhun Karimov
Hi Lincoln, I think I was misunderstood. My approach was not to use MiniBatchLocalGroupAggFunction directly but use the similar approach to it. Currently, local and global aggregate functions are used together in query plans. In my quick PoC, I verified that my modified version of

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-09 Thread Lincoln Lee
Thanks Jeyhun for your reply! Unfortunately, MiniBatchLocalGroupAggFunction only works for local agg in two-phase aggregation, while global aggregation (which is actually handled by the KeyedMapBundleOperator) still relies on the KeyedStream, meaning that consistency of the partitioner and state

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-05 Thread Jeyhun Karimov
Hi Lincoln, I did a bit of analysis on small PoC. Please find my comments below: - In general, current design supports streaming workloads. However, as you mentioned it comes with some (implementation-related) difficulties. One of them (as you also mentioned) is that most of the operators

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-05 Thread Jeyhun Karimov
Hi Leonard, I did a bit of analysis on possible integration of this FLIP with Kafka source. Please find the sample code and explanations below: public class KafkaDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown, SupportsPartitioning { // other

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Jeyhun Karimov
Hi Lincoln, Thanks for your reply. My idea was to utilize MapBundleFunction as it was already used in a similar context - MiniBatchLocalGroupAggFunction. I can also extend my PoC for streaming sources and get back to continue our discussion. Regards, Jeyhun On Wed, Apr 3, 2024 at 4:33 PM

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Jeyhun Karimov
Hi Leonard, Thanks a lot for your comments. Please find my answers below: (1)The FLIP motivation section says Kafka broker is already partitioned > w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning > by key fields is not the default partitioner of Kafka default >

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Lincoln Lee
Hi Jeyhun, Thanks for your quick response! In streaming scenario, shuffle commonly occurs before the stateful operator, and there's a sanity check[1] when the stateful operator accesses the state. This implies the consistency requirement of the partitioner used for data shuffling and state key

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Leonard Xu
Hey, Jeyhun Thanks for kicking off this discussion. I have two questions about streaming sources: (1)The FLIP motivation section says Kafka broker is already partitioned w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning by key fields is not the default partitioner

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-02 Thread Jeyhun Karimov
Hi Lincoln, Thanks a lot for your comments. Please find my answers below. 1. Is this flip targeted only at batch scenarios or does it include > streaming? > (The flip and the discussion did not explicitly mention this, but in the > draft pr, I only > saw the implementation for batch scenarios >

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-02 Thread Lincoln Lee
Hi Jeyhun, Thank you for driving this, it would be very useful optimization! Sorry for joining the discussion now(I originally planned to reply earlier, but happened to be during my vacation). I have two questions: 1. Is this flip targeted only at batch scenarios or does it include streaming?

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-01 Thread Jeyhun Karimov
Hi everyone, Thanks for your valuable feedback! The discussion on this FLIP has been going on for a while. I would like to start a vote after 48 hours. Please let me know if you have any concerns or any further questions/comments. Regards, Jeyhun On Thu, Mar 21, 2024 at 6:01 PM Jeyhun

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-21 Thread Jeyhun Karimov
Hi Lorenzo, Thanks a lot for your comments. Please find my answers below: For the interface `SupportsPartitioning`, why returning `Optional`? > If one decides to implement that, partitions must exist (at maximum, > return and empty list). Returning `Optional` seem just to complicate the > logic

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-21 Thread Benchao Li
Jeyhun, Sorry for the delay. And thanks for the explanation, it sounds good to me! Jeyhun Karimov 于2024年3月16日周六 05:09写道: > > Hi Benchao, > > Thanks for your comments. > > 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > > if we cannot have a good greatest common divisor,

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-21 Thread lorenzo . affetti
Hello Jeyhun, I really like the proposal and definitely makes sense to me. I have a couple of nits here and there: For the interface `SupportsPartitioning`, why returning `Optional`? If one decides to implement that, partitions must exist (at maximum, return and empty list). Returning

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-15 Thread Jeyhun Karimov
Hi Benchao, Thanks for your comments. 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > if we cannot have a good greatest common divisor, like 127 + 128, > could we just utilize one side's pre-partitioned attribute, and let > another side just do the shuffle? There are two

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Jeyhun Karimov
Hi Jane, Thanks for your comments. I understand your point. **It would be better if you could sync the > content to the FLIP**. - Sure thing. I added my above answer to the FLIP. Another thing is I'm curious about what the physical plan looks like. Is > there any specific info that will be

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Jeyhun Karimov
Hi Hang, Thanks for the comments. I have a question about the part `Additional option to disable this > optimization`. Is this option a source configuration or a table > configuration? - It is a source configuration. Besides that, there is a little mistake if I do not understand wrongly. >

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Benchao Li
Thanks Jeyhun for bringing up this discussion, it is really exiting, +1 for the general idea. We also introduced a similar concept in Flink Batch internally to cope with bucketed tables in Hive, it is a very important improvement. > One thing to note is that for join queries, the parallelism of

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Jane Chan
Hi Jeyhun, Thanks for your clarification. > Once a new partition is detected, we add it to our existing mapping. Our mapping looks like Map> subtaskToPartitionAssignment, where it maps each source subtaskID to zero or more partitions. I understand your point. **It would be better if you could

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Hang Ruan
Hi, Jeyhun. Thanks for the FLIP. Totally +1 for it. I have a question about the part `Additional option to disable this optimization`. Is this option a source configuration or a table configuration? Besides that, there is a little mistake if I do not understand wrongly. Should `Check if

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-13 Thread Jeyhun Karimov
Hi Jane, Thanks for your comments. 1. Concerning the `sourcePartitions()` method, the partition information > returned during the optimization phase may not be the same as the partition > information during runtime execution. For long-running jobs, partitions may > be continuously created. Is

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-13 Thread Jeyhun Karimov
Hi Jim, Thanks for your comments. I wonder if it'd make sense to > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and > "partitions" (and maybe even situations where a data source is partitioned > and bucketed). Now that I go through FLIP-376 [1] again, your suggestion

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-12 Thread Jane Chan
Hi Jeyhun, Thank you for leading the discussion. I'm generally +1 with this proposal, along with some questions. Please see my comments below. 1. Concerning the `sourcePartitions()` method, the partition information returned during the optimization phase may not be the same as the partition

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-11 Thread Jim Hughes
Hi Jeyhun, I like the idea! Given FLIP-376[1], I wonder if it'd make sense to generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and "partitions" (and maybe even situations where a data source is partitioned and bucketed). Separate from that, the page mentions TPC-H Q1 as an

[DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-10 Thread Jeyhun Karimov
Hi devs, I’d like to start a discussion on FLIP-434: Support optimizations for pre-partitioned data sources [1]. The FLIP introduces taking advantage of pre-partitioned data sources for SQL/Table API (it is already supported as experimental feature in DataStream API [2]). Please find more