Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-20 Thread Hequn Cheng
Hi Piotrek,

I agree that the shuttle way makes the code more easier and clean. But it
will lose the abilities of rules.

For Volcano, it does interrupt searching plans space when it reaches the
maximal iteration times, but there is a condition that the result plan is a
valid plan, or it will throw can not find valid plan exception. In this
case, we have to carefully design the retraction rules to make sure the
result plan is valid. Other rules need not to care about retraction
details. The advantage of Volcano is that it can find a plan with a
smallest cost. This is very helpful when we have to consider the cost.

For Hep, we don't have to manually order rules. There is a mode for Hep
that can execute rules in arbitrary order(instead of sequence order). In
arbitrary order mode, optimizer stops when there is no rule can be applied.
In this case, other rules needn't care about retraction details either,
because retraction rule will be applied again if any rule
pollute retraction traits.

Best, Hequn


On Mon, Jun 18, 2018 at 4:06 PM, Piotr Nowojski 
wrote:

> Hi
>
> I had some follow up thoughts on this matter over the weekend. I might be
> mistaken here, since I have never worked with Calcite before, so whatever
> I’m writing comes with my previous experience with optimisers and mostly
> guesses/assumptions that Calcite works as I would expect it to work. So if
> I’m missing something I would be glad to learn something :) With this
> disclaimer:
>
> > I think it is better to make rules orthogonal. Each rule focuses on an
> independent optimization. The rest we need to do is to pass our rules
> to HEP planner or VOLCANO planner to get a final plan.
>
> Can we even do that with rules that set up the traits, like primary key or
> retraction/upsert modes?
>
> 1. Volcano
>
> Hooking up our three rules, as we have them now for setting up
> retractions/upserts, and mixing them with optimiser rules that DO NOT
> understand those traits and DO brake them would mean that volcano might
> interrupt searching plans space in the middle of having broken upsert plan
> (after firing optimisation rule, before firing rule that fixes/sets up
> retraction/upsert). I think that’s one of the assumptions of most (all?)
> optimisers (especially iterative ones), that optimisation rules can not
> brake the plan.
>
> On the other hand, if we wish our optimisation rules to NOT brake the
> traits, they must understand them and take them into account in each
> rewrite that they do. In that case, there is no point of mixing setting up
> those retraction/upsert rules with optimisation, we could just set up
> retraction/upsert initially once after some stage of planing and, as I
> wrote previously, just make sure that follow up optimiser rules do not
> brake traits.
>
> 2. Other planners
>
> The same holds true here. We would have to manually order rules and we
> wouldn’t be able to arbitrarily interleave rules that set up retractions
> and other optimisation rules, unless those rules DO understand and DO NOT
> brake retraction/upsert/primary key traits. And again, if we will be forced
> anyway to have three set of rules:
>
> a. Rules that do not understand retraction/upsert/primary key traits
> b. Rules that set up retraction/upsert/primary key traits
> c. Rules that do understand retraction/upsert/primary key traits
>
> That are carefully ordered, what’s the point or value of mixing b. with
> c.?
>
>
> On the other hand, I see a value in leaving b. and c. separated - code
> readability and separation of concerns. Now one have to manually jump
> between rules/classes to make mental connections what interconnect with
> what and make assumptions where do those connections end. That’s even
> visible in the issue, that `UpdateAsRetractionTrait` is leaking outside.
> This is a purely intermediate thing, that doesn’t have to be exposed
> outside. However as it is now, it’s clogging the view for the reader. It’s
> like a private state/field that is being exposed to other classes. With one
> shuttle/visitor to set up retraction/upsert, this intermediate state could
> be kept completely internally and hidden from the outsiders.
>
> As a side note, having separate visitor/shuttle to validate/set up some
> traits, would be helpful in writing optimisation rules that on their own DO
> brake those traits and converting them into rules that DO NOT brake them.
> We could use such visitor/shuttle as a validation step or a clean up step
> after a rewrite.
>
> Piotrek
>
> On 16 Jun 2018, at 10:42, Hequn Cheng  wrote:
>
> Hi Piotr :-)
>
> I think it is better to make rules orthogonal. Each rule focuses on an
> independent optimization. The rest we need to do is to pass our rules
> to HEP planner or VOLCANO planner to get a final plan.
>
>
> On Wed, Jun 13, 2018 at 5:48 PM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Maybe this boils down to how do we envision plan modifications, after
>> setting up initial upserts/retraction modes/traits. If we do some plan
>> 

Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-18 Thread Piotr Nowojski
Hi

I had some follow up thoughts on this matter over the weekend. I might be 
mistaken here, since I have never worked with Calcite before, so whatever I’m 
writing comes with my previous experience with optimisers and mostly 
guesses/assumptions that Calcite works as I would expect it to work. So if I’m 
missing something I would be glad to learn something :) With this disclaimer:

> I think it is better to make rules orthogonal. Each rule focuses on an 
> independent optimization. The rest we need to do is to pass our rules to HEP 
> planner or VOLCANO planner to get a final plan.

Can we even do that with rules that set up the traits, like primary key or 
retraction/upsert modes?

1. Volcano

Hooking up our three rules, as we have them now for setting up 
retractions/upserts, and mixing them with optimiser rules that DO NOT 
understand those traits and DO brake them would mean that volcano might 
interrupt searching plans space in the middle of having broken upsert plan 
(after firing optimisation rule, before firing rule that fixes/sets up 
retraction/upsert). I think that’s one of the assumptions of most (all?) 
optimisers (especially iterative ones), that optimisation rules can not brake 
the plan. 

On the other hand, if we wish our optimisation rules to NOT brake the traits, 
they must understand them and take them into account in each rewrite that they 
do. In that case, there is no point of mixing setting up those 
retraction/upsert rules with optimisation, we could just set up 
retraction/upsert initially once after some stage of planing and, as I wrote 
previously, just make sure that follow up optimiser rules do not brake traits.

2. Other planners

The same holds true here. We would have to manually order rules and we wouldn’t 
be able to arbitrarily interleave rules that set up retractions and other 
optimisation rules, unless those rules DO understand and DO NOT brake 
retraction/upsert/primary key traits. And again, if we will be forced anyway to 
have three set of rules:

a. Rules that do not understand retraction/upsert/primary key traits
b. Rules that set up retraction/upsert/primary key traits
c. Rules that do understand retraction/upsert/primary key traits

That are carefully ordered, what’s the point or value of mixing b. with c.? 


On the other hand, I see a value in leaving b. and c. separated - code 
readability and separation of concerns. Now one have to manually jump between 
rules/classes to make mental connections what interconnect with what and make 
assumptions where do those connections end. That’s even visible in the issue, 
that `UpdateAsRetractionTrait` is leaking outside. This is a purely 
intermediate thing, that doesn’t have to be exposed outside. However as it is 
now, it’s clogging the view for the reader. It’s like a private state/field 
that is being exposed to other classes. With one shuttle/visitor to set up 
retraction/upsert, this intermediate state could be kept completely internally 
and hidden from the outsiders.

As a side note, having separate visitor/shuttle to validate/set up some traits, 
would be helpful in writing optimisation rules that on their own DO brake those 
traits and converting them into rules that DO NOT brake them. We could use such 
visitor/shuttle as a validation step or a clean up step after a rewrite.

Piotrek

> On 16 Jun 2018, at 10:42, Hequn Cheng  wrote:
> 
> Hi Piotr :-)
> 
> I think it is better to make rules orthogonal. Each rule focuses on an 
> independent optimization. The rest we need to do is to pass our rules to HEP 
> planner or VOLCANO planner to get a final plan.
> 
> 
> On Wed, Jun 13, 2018 at 5:48 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Maybe this boils down to how do we envision plan modifications, after setting 
> up initial upserts/retraction modes/traits. If we do some plan rewrite 
> afterwards, do we want to relay on our current dynamic rules to “fix it”? Do 
> we want to rerun DataStreamRetractionRules shuttle again after rewriting the 
> plan? Or do we want to guarantee, that any plan rewriting rule that we run 
> AFTER setting up retractions/upsert traits, do not brake them, but must take 
> them into account (for Example if we add a new node, we would expect it to 
> have correctly and consistently set retraction traits with respect of 
> parent/children).
> 
> I was thinking about the last approach - rules executed after adding traits 
> should preserve consistency of those traits. That’s why I didn’t mind setting 
> up retractions rules in a shuttle.
> 
> Piotrek
> 
> 
>> On 6 Jun 2018, at 04:33, Hequn Cheng > > wrote:
>> 
>> Hi, thanks for bringing up this discussion. 
>> 
>> I agree to unify the UniqueKeyExtractor and DataStreamRetractionRules, 
>> however I am not sure if it is a good idea to implement it with RelShuttle. 
>> Theoretically, retraction rules and other rules may depend on each other. 
>> So, by using a RelShuttle instead of 

Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-16 Thread Hequn Cheng
Hi Piotr :-)

I think it is better to make rules orthogonal. Each rule focuses on an
independent optimization. The rest we need to do is to pass our rules
to HEP planner or VOLCANO planner to get a final plan.


On Wed, Jun 13, 2018 at 5:48 PM, Piotr Nowojski 
wrote:

> Hi,
>
> Maybe this boils down to how do we envision plan modifications, after
> setting up initial upserts/retraction modes/traits. If we do some plan
> rewrite afterwards, do we want to relay on our current dynamic rules to
> “fix it”? Do we want to rerun DataStreamRetractionRules shuttle again after
> rewriting the plan? Or do we want to guarantee, that any plan rewriting
> rule that we run AFTER setting up retractions/upsert traits, do not brake
> them, but must take them into account (for Example if we add a new node, we
> would expect it to have correctly and consistently set retraction traits
> with respect of parent/children).
>
> I was thinking about the last approach - rules executed after adding
> traits should preserve consistency of those traits. That’s why I didn’t
> mind setting up retractions rules in a shuttle.
>
> Piotrek
>
>
> On 6 Jun 2018, at 04:33, Hequn Cheng  wrote:
>
> Hi, thanks for bringing up this discussion.
>
> I agree to unify the UniqueKeyExtractor and DataStreamRetractionRules,
> however I am not sure if it is a good idea to implement it with RelShuttle. 
> Theoretically,
> retraction rules and other rules may depend on each other. So, by using a
> RelShuttle instead of rules we might loose the flexiblity to perform
> further optimizations.
>
> As for the join problem, we can solve it by the flowing two changes:
> 1.Implement current UniqueKeyExtractor by adding a FlinkRelMdUniqueKeys
> RelMetadataProvider in FlinkDefaultRelMetadataProvider, so that we can
> get unique keys of a RelNode during optimization.
> 2.Treat needsUpdatesAsRetraction method in DataStreamRel as a edge
> attribute instead of a node attribute. We can implement this with minor
> changes. The new needsUpdatesAsRetraction in DataStreamJoin will looks like
> `def needsUpdatesAsRetraction(input: RelNode): Boolean`.
> In needsUpdatesAsRetraction of join, we can compare the join key and unique
> keys of the input relnode and return false if unique keys contain join key. In
> this way, the two input edges of join can work in different mode.
>
> Best, Hequn.
>
> On Wed, Jun 6, 2018 at 12:00 AM, Rong Rong  wrote:
>
>> +1 on the refactoring.
>>
>> I spent some time a while back trying to get a better understanding on
>> the several rules mentioned here.
>> Correct me if I were wrong by I was under the impression that the reason
>> why the rules are split was because AccMode and UpdateMode are the ones
>> that we care about and the "NeedToRetract" was only the "intermediate"
>> indicator. I guess that's the part that confuse me the most.
>>
>> Another thing that confuses me is whether we can mix the modes of
>> operators and while traversing the plan to pick the "least restrictive"
>> mode, like @piotr mentioned, if operators can both support upserts or
>> retractions like in [2b] (the 2nd [2a]).
>>
>> --
>> Rong
>>
>>
>>
>> On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> I think the proposed refactoring is a good idea.
>>> It should simplify the logic to determine which update mode to use.
>>> We could also try to make some of the method and field names more
>>> intuitive
>>> and extend the internal documentation a bit.
>>>
>>> @Hequn, It would be good to get your thoughts on this issue as well.
>>> Thank
>>> you!
>>>
>>> While thinking about this issue I noticed a severe bug in how filters
>>> handle upsert messages.
>>> I've opened FLINK-9528 [1] for that.
>>>
>>> Best, Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9528
>>>
>>> 2018-06-04 10:23 GMT+02:00 Timo Walther :
>>>
>>> > Hi Piotr,
>>> >
>>> > thanks for bringing up this discussion. I was not involved in the
>>> design
>>> > discussions at that time but I also find the logic about upserts and
>>> > retractions in multiple stages quite confusing. So in general +1 for
>>> > simplification, however, by using a RelShuttle instead of rules we
>>> might
>>> > loose the flexiblity to perform further optimizations by introducing
>>> new
>>> > rules in the future. Users could not change the static logic in a
>>> > RelShuttle, right now they can influence the behaviour using
>>> CalciteConfig
>>> > and custom rules.
>>> >
>>> > Regards,
>>> > Timo
>>> >
>>> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
>>> >
>>> > Hi,
>>> >>
>>> >> Recently I was looking into upserts and upserts sources in Flink and
>>> >> while doing so, I noticed some potential room for
>>> >> improvement/simplification.
>>> >>
>>> >> Currently there are 3 optimiser rules in DataStreamRetractionRules
>>> that
>>> >> work in three stages followed by UniqueKeyExtractor plan node visitor
>>> to
>>> >> set preferred updates mode, with validation for correct keys for
>>> upserts.
>>> >> First 

Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-13 Thread Piotr Nowojski
Hi,

Maybe this boils down to how do we envision plan modifications, after setting 
up initial upserts/retraction modes/traits. If we do some plan rewrite 
afterwards, do we want to relay on our current dynamic rules to “fix it”? Do we 
want to rerun DataStreamRetractionRules shuttle again after rewriting the plan? 
Or do we want to guarantee, that any plan rewriting rule that we run AFTER 
setting up retractions/upsert traits, do not brake them, but must take them 
into account (for Example if we add a new node, we would expect it to have 
correctly and consistently set retraction traits with respect of 
parent/children).

I was thinking about the last approach - rules executed after adding traits 
should preserve consistency of those traits. That’s why I didn’t mind setting 
up retractions rules in a shuttle.

Piotrek

> On 6 Jun 2018, at 04:33, Hequn Cheng  wrote:
> 
> Hi, thanks for bringing up this discussion. 
> 
> I agree to unify the UniqueKeyExtractor and DataStreamRetractionRules, 
> however I am not sure if it is a good idea to implement it with RelShuttle. 
> Theoretically, retraction rules and other rules may depend on each other. So, 
> by using a RelShuttle instead of rules we might loose the flexiblity to 
> perform further optimizations.
> 
> As for the join problem, we can solve it by the flowing two changes:
> 1.Implement current UniqueKeyExtractor by adding a FlinkRelMdUniqueKeys 
> RelMetadataProvider in FlinkDefaultRelMetadataProvider, so that we can get 
> unique keys of a RelNode during optimization.
> 2.Treat needsUpdatesAsRetraction method in DataStreamRel as a edge attribute 
> instead of a node attribute. We can implement this with minor changes. The 
> new needsUpdatesAsRetraction in DataStreamJoin will looks like `def 
> needsUpdatesAsRetraction(input: RelNode): Boolean`. In 
> needsUpdatesAsRetraction of join, we can compare the join key and unique keys 
> of the input relnode and return false if unique keys contain join key. In 
> this way, the two input edges of join can work in different mode.
> 
> Best, Hequn.
> 
> On Wed, Jun 6, 2018 at 12:00 AM, Rong Rong  > wrote:
> +1 on the refactoring.
> 
> I spent some time a while back trying to get a better understanding on the 
> several rules mentioned here.
> Correct me if I were wrong by I was under the impression that the reason why 
> the rules are split was because AccMode and UpdateMode are the ones that we 
> care about and the "NeedToRetract" was only the "intermediate" indicator. I 
> guess that's the part that confuse me the most.
> 
> Another thing that confuses me is whether we can mix the modes of operators 
> and while traversing the plan to pick the "least restrictive" mode, like 
> @piotr mentioned, if operators can both support upserts or retractions like 
> in [2b] (the 2nd [2a]). 
> 
> --
> Rong
> 
> 
> 
> On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske  > wrote:
> Hi,
> 
> I think the proposed refactoring is a good idea.
> It should simplify the logic to determine which update mode to use.
> We could also try to make some of the method and field names more intuitive
> and extend the internal documentation a bit.
> 
> @Hequn, It would be good to get your thoughts on this issue as well. Thank
> you!
> 
> While thinking about this issue I noticed a severe bug in how filters
> handle upsert messages.
> I've opened FLINK-9528 [1] for that.
> 
> Best, Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9528 
> 
> 
> 2018-06-04 10:23 GMT+02:00 Timo Walther  >:
> 
> > Hi Piotr,
> >
> > thanks for bringing up this discussion. I was not involved in the design
> > discussions at that time but I also find the logic about upserts and
> > retractions in multiple stages quite confusing. So in general +1 for
> > simplification, however, by using a RelShuttle instead of rules we might
> > loose the flexiblity to perform further optimizations by introducing new
> > rules in the future. Users could not change the static logic in a
> > RelShuttle, right now they can influence the behaviour using CalciteConfig
> > and custom rules.
> >
> > Regards,
> > Timo
> >
> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
> >
> > Hi,
> >>
> >> Recently I was looking into upserts and upserts sources in Flink and
> >> while doing so, I noticed some potential room for
> >> improvement/simplification.
> >>
> >> Currently there are 3 optimiser rules in DataStreamRetractionRules that
> >> work in three stages followed by UniqueKeyExtractor plan node visitor to
> >> set preferred updates mode, with validation for correct keys for upserts.
> >> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in
> >> another rule we use it setup AccModeTrait. AccModeTrait has only two values
> >> Acc (upserts) or AccRetract (retractions). This has some severe limitations
> >> and requires additional stage of 

Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-05 Thread Hequn Cheng
Hi, thanks for bringing up this discussion.

I agree to unify the UniqueKeyExtractor and DataStreamRetractionRules,
however I am not sure if it is a good idea to implement it
with RelShuttle. Theoretically, retraction rules and other rules may depend
on each other. So, by using a RelShuttle instead of rules we might loose
the flexiblity to perform further optimizations.

As for the join problem, we can solve it by the flowing two changes:
1.Implement current UniqueKeyExtractor by adding a FlinkRelMdUniqueKeys
RelMetadataProvider in FlinkDefaultRelMetadataProvider, so that we can get
unique keys of a RelNode during optimization.
2.Treat needsUpdatesAsRetraction method in DataStreamRel as a edge
attribute instead of a node attribute. We can implement this with minor
changes. The new needsUpdatesAsRetraction in DataStreamJoin will looks like
`def needsUpdatesAsRetraction(input: RelNode): Boolean`.
In needsUpdatesAsRetraction of join, we can compare the join key and unique
keys of the input relnode and return false if unique keys contain join key. In
this way, the two input edges of join can work in different mode.

Best, Hequn.

On Wed, Jun 6, 2018 at 12:00 AM, Rong Rong  wrote:

> +1 on the refactoring.
>
> I spent some time a while back trying to get a better understanding on the
> several rules mentioned here.
> Correct me if I were wrong by I was under the impression that the reason
> why the rules are split was because AccMode and UpdateMode are the ones
> that we care about and the "NeedToRetract" was only the "intermediate"
> indicator. I guess that's the part that confuse me the most.
>
> Another thing that confuses me is whether we can mix the modes of
> operators and while traversing the plan to pick the "least restrictive"
> mode, like @piotr mentioned, if operators can both support upserts or
> retractions like in [2b] (the 2nd [2a]).
>
> --
> Rong
>
>
>
> On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> I think the proposed refactoring is a good idea.
>> It should simplify the logic to determine which update mode to use.
>> We could also try to make some of the method and field names more
>> intuitive
>> and extend the internal documentation a bit.
>>
>> @Hequn, It would be good to get your thoughts on this issue as well. Thank
>> you!
>>
>> While thinking about this issue I noticed a severe bug in how filters
>> handle upsert messages.
>> I've opened FLINK-9528 [1] for that.
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9528
>>
>> 2018-06-04 10:23 GMT+02:00 Timo Walther :
>>
>> > Hi Piotr,
>> >
>> > thanks for bringing up this discussion. I was not involved in the design
>> > discussions at that time but I also find the logic about upserts and
>> > retractions in multiple stages quite confusing. So in general +1 for
>> > simplification, however, by using a RelShuttle instead of rules we might
>> > loose the flexiblity to perform further optimizations by introducing new
>> > rules in the future. Users could not change the static logic in a
>> > RelShuttle, right now they can influence the behaviour using
>> CalciteConfig
>> > and custom rules.
>> >
>> > Regards,
>> > Timo
>> >
>> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
>> >
>> > Hi,
>> >>
>> >> Recently I was looking into upserts and upserts sources in Flink and
>> >> while doing so, I noticed some potential room for
>> >> improvement/simplification.
>> >>
>> >> Currently there are 3 optimiser rules in DataStreamRetractionRules that
>> >> work in three stages followed by UniqueKeyExtractor plan node visitor
>> to
>> >> set preferred updates mode, with validation for correct keys for
>> upserts.
>> >> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in
>> >> another rule we use it setup AccModeTrait. AccModeTrait has only two
>> values
>> >> Acc (upserts) or AccRetract (retractions). This has some severe
>> limitations
>> >> and requires additional stage of UniqueKeyExtractor (implemented as a
>> >> visitor) to actually verify that keys are set correctly.
>> >>
>> >> I would propose to unify those into one visitor (probably RelShuttle
>> >> implementation), that would traverse the plan from root -> leafs. On a
>> way
>> >> down it would collect preferences of the nodes regarding updates mode
>> >> (including keys for upserts). On a way up, it would pick
>> >> upsert(keys)/retraction/append only modes or fail if that was
>> impossible
>> >> [1].
>> >>
>> >> I think that would simplify the code by noticeable margin. Instead of
>> >> having this logic distributed among 4 classes in two files/independent
>> >> steps, it would be in one simple class.
>> >>
>> >> It would open us a possibility for further improvements. For operators
>> >> that could process both upserts or retractions (with before mentioned
>> >> solution that decides upsert vs retract in the same step as validating
>> >> keys) we could choose upserts if the keys are matching and fallback to
>> >> retractions 

Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-05 Thread Rong Rong
+1 on the refactoring.

I spent some time a while back trying to get a better understanding on the
several rules mentioned here.
Correct me if I were wrong by I was under the impression that the reason
why the rules are split was because AccMode and UpdateMode are the ones
that we care about and the "NeedToRetract" was only the "intermediate"
indicator. I guess that's the part that confuse me the most.

Another thing that confuses me is whether we can mix the modes of operators
and while traversing the plan to pick the "least restrictive" mode, like
@piotr mentioned, if operators can both support upserts or retractions like
in [2b] (the 2nd [2a]).

--
Rong



On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske  wrote:

> Hi,
>
> I think the proposed refactoring is a good idea.
> It should simplify the logic to determine which update mode to use.
> We could also try to make some of the method and field names more intuitive
> and extend the internal documentation a bit.
>
> @Hequn, It would be good to get your thoughts on this issue as well. Thank
> you!
>
> While thinking about this issue I noticed a severe bug in how filters
> handle upsert messages.
> I've opened FLINK-9528 [1] for that.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9528
>
> 2018-06-04 10:23 GMT+02:00 Timo Walther :
>
> > Hi Piotr,
> >
> > thanks for bringing up this discussion. I was not involved in the design
> > discussions at that time but I also find the logic about upserts and
> > retractions in multiple stages quite confusing. So in general +1 for
> > simplification, however, by using a RelShuttle instead of rules we might
> > loose the flexiblity to perform further optimizations by introducing new
> > rules in the future. Users could not change the static logic in a
> > RelShuttle, right now they can influence the behaviour using
> CalciteConfig
> > and custom rules.
> >
> > Regards,
> > Timo
> >
> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
> >
> > Hi,
> >>
> >> Recently I was looking into upserts and upserts sources in Flink and
> >> while doing so, I noticed some potential room for
> >> improvement/simplification.
> >>
> >> Currently there are 3 optimiser rules in DataStreamRetractionRules that
> >> work in three stages followed by UniqueKeyExtractor plan node visitor to
> >> set preferred updates mode, with validation for correct keys for
> upserts.
> >> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in
> >> another rule we use it setup AccModeTrait. AccModeTrait has only two
> values
> >> Acc (upserts) or AccRetract (retractions). This has some severe
> limitations
> >> and requires additional stage of UniqueKeyExtractor (implemented as a
> >> visitor) to actually verify that keys are set correctly.
> >>
> >> I would propose to unify those into one visitor (probably RelShuttle
> >> implementation), that would traverse the plan from root -> leafs. On a
> way
> >> down it would collect preferences of the nodes regarding updates mode
> >> (including keys for upserts). On a way up, it would pick
> >> upsert(keys)/retraction/append only modes or fail if that was impossible
> >> [1].
> >>
> >> I think that would simplify the code by noticeable margin. Instead of
> >> having this logic distributed among 4 classes in two files/independent
> >> steps, it would be in one simple class.
> >>
> >> It would open us a possibility for further improvements. For operators
> >> that could process both upserts or retractions (with before mentioned
> >> solution that decides upsert vs retract in the same step as validating
> >> keys) we could choose upserts if the keys are matching and fallback to
> >> retractions only if they don't. Now it wouldn’t be possible (example
> [2a],
> >> [2b]).
> >>
> >> Thanks Piotrek
> >>
> >> [1] Example impossible case:
> >>
> >> DataStream> ds1 =
> >> JavaStreamTestData.getSmall3TupleDataSet(env);
> >> Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as
> >> a,b,c");
> >>
> >> DataStream> ds2 =
> >> JavaStreamTestData.getSmall3TupleDataSet(env);
> >> Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");
> >>
> >> Table g1 = t1.groupBy("a").select("a, b.count");
> >> Table g2 = t2.groupBy("b").select("a.count as a, b");
> >>
> >> g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")},
> >> false));
> >>
> >> [2a]
> >>
> >> val t1 = util.addTable[(Long, Long)]('a, 'b)
> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
> >>
> >> val g1 = t1.groupBy("a").select("a, b.count")
> >> val g2 = t2.groupBy(“y").select(“x.count, y")
> >>
> >> val resultTable = g1.join(g2, “a=y”)
> >>
> >> `g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are
> >> retractions.
> >>
> >> [2a]
> >>
> >> val t1 = util.addTable[(Long, Long)]('a, 'b)
> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
> >>
> >> val g1 = t1.groupBy("a").select("a, b.count")
> >> val g2 = t2.groupBy(“x").select(“x, y.count as y")
> >>
> >> val resultTable = g1.join(g2, “a=y”)
> >>

Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-05 Thread Fabian Hueske
Hi,

I think the proposed refactoring is a good idea.
It should simplify the logic to determine which update mode to use.
We could also try to make some of the method and field names more intuitive
and extend the internal documentation a bit.

@Hequn, It would be good to get your thoughts on this issue as well. Thank
you!

While thinking about this issue I noticed a severe bug in how filters
handle upsert messages.
I've opened FLINK-9528 [1] for that.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9528

2018-06-04 10:23 GMT+02:00 Timo Walther :

> Hi Piotr,
>
> thanks for bringing up this discussion. I was not involved in the design
> discussions at that time but I also find the logic about upserts and
> retractions in multiple stages quite confusing. So in general +1 for
> simplification, however, by using a RelShuttle instead of rules we might
> loose the flexiblity to perform further optimizations by introducing new
> rules in the future. Users could not change the static logic in a
> RelShuttle, right now they can influence the behaviour using CalciteConfig
> and custom rules.
>
> Regards,
> Timo
>
> Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
>
> Hi,
>>
>> Recently I was looking into upserts and upserts sources in Flink and
>> while doing so, I noticed some potential room for
>> improvement/simplification.
>>
>> Currently there are 3 optimiser rules in DataStreamRetractionRules that
>> work in three stages followed by UniqueKeyExtractor plan node visitor to
>> set preferred updates mode, with validation for correct keys for upserts.
>> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in
>> another rule we use it setup AccModeTrait. AccModeTrait has only two values
>> Acc (upserts) or AccRetract (retractions). This has some severe limitations
>> and requires additional stage of UniqueKeyExtractor (implemented as a
>> visitor) to actually verify that keys are set correctly.
>>
>> I would propose to unify those into one visitor (probably RelShuttle
>> implementation), that would traverse the plan from root -> leafs. On a way
>> down it would collect preferences of the nodes regarding updates mode
>> (including keys for upserts). On a way up, it would pick
>> upsert(keys)/retraction/append only modes or fail if that was impossible
>> [1].
>>
>> I think that would simplify the code by noticeable margin. Instead of
>> having this logic distributed among 4 classes in two files/independent
>> steps, it would be in one simple class.
>>
>> It would open us a possibility for further improvements. For operators
>> that could process both upserts or retractions (with before mentioned
>> solution that decides upsert vs retract in the same step as validating
>> keys) we could choose upserts if the keys are matching and fallback to
>> retractions only if they don't. Now it wouldn’t be possible (example [2a],
>> [2b]).
>>
>> Thanks Piotrek
>>
>> [1] Example impossible case:
>>
>> DataStream> ds1 =
>> JavaStreamTestData.getSmall3TupleDataSet(env);
>> Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as
>> a,b,c");
>>
>> DataStream> ds2 =
>> JavaStreamTestData.getSmall3TupleDataSet(env);
>> Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");
>>
>> Table g1 = t1.groupBy("a").select("a, b.count");
>> Table g2 = t2.groupBy("b").select("a.count as a, b");
>>
>> g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")},
>> false));
>>
>> [2a]
>>
>> val t1 = util.addTable[(Long, Long)]('a, 'b)
>> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
>>
>> val g1 = t1.groupBy("a").select("a, b.count")
>> val g2 = t2.groupBy(“y").select(“x.count, y")
>>
>> val resultTable = g1.join(g2, “a=y”)
>>
>> `g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are
>> retractions.
>>
>> [2a]
>>
>> val t1 = util.addTable[(Long, Long)]('a, 'b)
>> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
>>
>> val g1 = t1.groupBy("a").select("a, b.count")
>> val g2 = t2.groupBy(“x").select(“x, y.count as y")
>>
>> val resultTable = g1.join(g2, “a=y”)
>>
>> `g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but
>> only retractions from g2 (different key columns). Now both are retractions.
>>
>>
>


Re: [TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-04 Thread Timo Walther

Hi Piotr,

thanks for bringing up this discussion. I was not involved in the design 
discussions at that time but I also find the logic about upserts and 
retractions in multiple stages quite confusing. So in general +1 for 
simplification, however, by using a RelShuttle instead of rules we might 
loose the flexiblity to perform further optimizations by introducing new 
rules in the future. Users could not change the static logic in a 
RelShuttle, right now they can influence the behaviour using 
CalciteConfig and custom rules.


Regards,
Timo

Am 01.06.18 um 13:26 schrieb Piotr Nowojski:

Hi,

Recently I was looking into upserts and upserts sources in Flink and while 
doing so, I noticed some potential room for improvement/simplification.

Currently there are 3 optimiser rules in DataStreamRetractionRules that work in 
three stages followed by UniqueKeyExtractor plan node visitor to set preferred 
updates mode, with validation for correct keys for upserts. First 
DataStreamRetractionRules setups UpdateAsRetractionTrait, next in another rule 
we use it setup AccModeTrait. AccModeTrait has only two values Acc (upserts) or 
AccRetract (retractions). This has some severe limitations and requires 
additional stage of UniqueKeyExtractor (implemented as a visitor) to actually 
verify that keys are set correctly.

I would propose to unify those into one visitor (probably RelShuttle 
implementation), that would traverse the plan from root -> leafs. On a way down 
it would collect preferences of the nodes regarding updates mode (including keys 
for upserts). On a way up, it would pick upsert(keys)/retraction/append only modes 
or fail if that was impossible [1].

I think that would simplify the code by noticeable margin. Instead of having 
this logic distributed among 4 classes in two files/independent steps, it would 
be in one simple class.

It would open us a possibility for further improvements. For operators that 
could process both upserts or retractions (with before mentioned solution that 
decides upsert vs retract in the same step as validating keys) we could choose 
upserts if the keys are matching and fallback to retractions only if they 
don't. Now it wouldn’t be possible (example [2a], [2b]).

Thanks Piotrek

[1] Example impossible case:

DataStream> ds1 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as 
a,b,c");

DataStream> ds2 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");

Table g1 = t1.groupBy("a").select("a, b.count");
Table g2 = t2.groupBy("b").select("a.count as a, b");

g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, false));

[2a]

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“y").select(“x.count, y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are 
retractions.

[2a]

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“x").select(“x, y.count as y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but only 
retractions from g2 (different key columns). Now both are retractions.