Hi Lincoln,
I think I was misunderstood.
My approach was not to use MiniBatchLocalGroupAggFunction directly but use
the similar approach to it.
Currently, local and global aggregate functions are used together in query
plans.
In my quick PoC, I verified that my modified version of
Thanks Jeyhun for your reply!
Unfortunately, MiniBatchLocalGroupAggFunction only works for local agg
in two-phase aggregation, while global aggregation (which is actually
handled
by the KeyedMapBundleOperator) still relies on the KeyedStream, meaning
that consistency of the partitioner and state
Hi Lincoln,
I did a bit of analysis on small PoC.
Please find my comments below:
- In general, current design supports streaming workloads. However, as you
mentioned it comes with some (implementation-related) difficulties.
One of them (as you also mentioned) is that most of the operators
Hi Leonard,
I did a bit of analysis on possible integration of this FLIP with Kafka
source. Please find the sample code and explanations below:
public class KafkaDynamicSource implements ScanTableSource,
SupportsReadingMetadata, SupportsWatermarkPushDown,
SupportsPartitioning
{
// other
Hi Lincoln,
Thanks for your reply. My idea was to utilize MapBundleFunction as it was
already used in a similar context - MiniBatchLocalGroupAggFunction.
I can also extend my PoC for streaming sources and get back to continue our
discussion.
Regards,
Jeyhun
On Wed, Apr 3, 2024 at 4:33 PM
Hi Leonard,
Thanks a lot for your comments. Please find my answers below:
(1)The FLIP motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
>
Hi Jeyhun,
Thanks for your quick response!
In streaming scenario, shuffle commonly occurs before the stateful
operator, and there's a sanity check[1] when the stateful operator
accesses the state. This implies the consistency requirement of the
partitioner used for data shuffling and state key
Hey, Jeyhun
Thanks for kicking off this discussion. I have two questions about streaming
sources:
(1)The FLIP motivation section says Kafka broker is already partitioned w.r.t.
some key[s] , Is this the main use case in Kafka world? Partitioning by key
fields is not the default partitioner
Hi Lincoln,
Thanks a lot for your comments. Please find my answers below.
1. Is this flip targeted only at batch scenarios or does it include
> streaming?
> (The flip and the discussion did not explicitly mention this, but in the
> draft pr, I only
> saw the implementation for batch scenarios
>
Hi Jeyhun,
Thank you for driving this, it would be very useful optimization!
Sorry for joining the discussion now(I originally planned to reply earlier,
but
happened to be during my vacation). I have two questions:
1. Is this flip targeted only at batch scenarios or does it include
streaming?
Hi everyone,
Thanks for your valuable feedback!
The discussion on this FLIP has been going on for a while.
I would like to start a vote after 48 hours.
Please let me know if you have any concerns or any further
questions/comments.
Regards,
Jeyhun
On Thu, Mar 21, 2024 at 6:01 PM Jeyhun
Hi Lorenzo,
Thanks a lot for your comments. Please find my answers below:
For the interface `SupportsPartitioning`, why returning `Optional`?
> If one decides to implement that, partitions must exist (at maximum,
> return and empty list). Returning `Optional` seem just to complicate the
> logic
Jeyhun,
Sorry for the delay. And thanks for the explanation, it sounds good to me!
Jeyhun Karimov 于2024年3月16日周六 05:09写道:
>
> Hi Benchao,
>
> Thanks for your comments.
>
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> > if we cannot have a good greatest common divisor,
Hello Jeyhun,
I really like the proposal and definitely makes sense to me.
I have a couple of nits here and there:
For the interface `SupportsPartitioning`, why returning `Optional`?
If one decides to implement that, partitions must exist (at maximum, return and
empty list). Returning
Hi Benchao,
Thanks for your comments.
1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> if we cannot have a good greatest common divisor, like 127 + 128,
> could we just utilize one side's pre-partitioned attribute, and let
> another side just do the shuffle?
There are two
Hi Jane,
Thanks for your comments.
I understand your point. **It would be better if you could sync the
> content to the FLIP**.
- Sure thing. I added my above answer to the FLIP.
Another thing is I'm curious about what the physical plan looks like. Is
> there any specific info that will be
Hi Hang,
Thanks for the comments.
I have a question about the part `Additional option to disable this
> optimization`. Is this option a source configuration or a table
> configuration?
- It is a source configuration.
Besides that, there is a little mistake if I do not understand wrongly.
>
Thanks Jeyhun for bringing up this discussion, it is really exiting,
+1 for the general idea.
We also introduced a similar concept in Flink Batch internally to cope
with bucketed tables in Hive, it is a very important improvement.
> One thing to note is that for join queries, the parallelism of
Hi Jeyhun,
Thanks for your clarification.
> Once a new partition is detected, we add it to our existing mapping. Our
mapping looks like Map> subtaskToPartitionAssignment,
where it maps each source subtaskID to zero or more partitions.
I understand your point. **It would be better if you could
Hi, Jeyhun.
Thanks for the FLIP. Totally +1 for it.
I have a question about the part `Additional option to disable this
optimization`. Is this option a source configuration or a table
configuration?
Besides that, there is a little mistake if I do not understand wrongly.
Should `Check if
Hi Jane,
Thanks for your comments.
1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is
Hi Jim,
Thanks for your comments.
I wonder if it'd make sense to
> generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> "partitions" (and maybe even situations where a data source is partitioned
> and bucketed).
Now that I go through FLIP-376 [1] again, your suggestion
Hi Jeyhun,
Thank you for leading the discussion. I'm generally +1 with this proposal,
along with some questions. Please see my comments below.
1. Concerning the `sourcePartitions()` method, the partition information
returned during the optimization phase may not be the same as the partition
Hi Jeyhun,
I like the idea! Given FLIP-376[1], I wonder if it'd make sense to
generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
"partitions" (and maybe even situations where a data source is partitioned
and bucketed).
Separate from that, the page mentions TPC-H Q1 as an
Hi devs,
I’d like to start a discussion on FLIP-434: Support optimizations for
pre-partitioned data sources [1].
The FLIP introduces taking advantage of pre-partitioned data sources for
SQL/Table API (it is already supported as experimental feature in
DataStream API [2]).
Please find more
25 matches
Mail list logo