Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-20 Thread David Anderson
I'm delighted to see the progress on this. This is going to be a major
enabler for some important use cases.

The proposed simplifications (global config and ordered mode) for V1 make a
lot of sense to me. +1

David

On Wed, Dec 20, 2023 at 12:31 PM Alan Sheinberg
 wrote:

> Thanks for that feedback Lincoln,
>
> Only one question with the async `timeout` parameter[1](since I
> > haven't seen the POC code), current description is: 'The time which can
> > pass before a restart strategy is triggered',
> > but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> > timeout is the total time, do we keep the behavior of the parameter
> > consistent?
>
> That's a good catch.  I was intending to use *AsyncWaitOperator*, and to
> pass this timeout directly.  Looking through the code a bit, it appears
> that it doesn't restart the timer on a retry, and this timeout is total, as
> you're saying.  I do intend on being consistent with the other FLIPs and
> retaining this behavior, so I will update the wording on my FLIP to reflect
> that.
>
> -Alan
>
> On Wed, Dec 20, 2023 at 1:36 AM Lincoln Lee 
> wrote:
>
> > +1 for this useful feature!
> > Hope this reply isn't too late. Agree that we start with global
> > async-scalar configuration and ordered mode first.
> >
> > @Alan Only one question with the async `timeout` parameter[1](since I
> > haven't seen the POC code), current description is: 'The time which can
> > pass before a restart strategy is triggered',
> > but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> > timeout is the total time, do we keep the behavior of the parameter
> > consistent?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Alan Sheinberg  于2023年12月20日周三 08:41写道:
> >
> > > Thanks for the comments Timo.
> > >
> > >
> > > > Can you remove the necessary parts? Esp.:
> > >
> > >  @Override
> > > >  public Set getRequirements() {
> > > >  return Collections.singleton(FunctionRequirement.ORDERED);
> > > >  }
> > >
> > >
> > > I removed this section from the FLIP since presumably, there's no use
> in
> > > adding to the public API if it's ignored, with handling just ORDERED
> for
> > > the first version.  I'm not sure how quickly I'll want to add UNORDERED
> > > support, but I guess I can always do another FLIP.
> > >
> > > Otherwise I have no objections to start a VOTE soonish. If others are
> > > > fine as well?
> > >
> > > That would be great.  Any areas that people are interested in
> discussing
> > > further before a vote?
> > >
> > > -Alan
> > >
> > > On Tue, Dec 19, 2023 at 5:49 AM Timo Walther 
> wrote:
> > >
> > > >  > I would be totally fine with the first version only having ORDERED
> > > >  > mode. For a v2, we could attempt to do the next most conservative
> > > >  > thing
> > > >
> > > > Sounds good to me.
> > > >
> > > > I also cheked AsyncWaitOperator and could not find n access of
> > > > StreamRecord's timestamp but only watermarks. But as we said, let's
> > > > focus on ORDERED first.
> > > >
> > > > Can you remove the necessary parts? Esp.:
> > > >
> > > >  @Override
> > > >  public Set getRequirements() {
> > > >  return Collections.singleton(FunctionRequirement.ORDERED);
> > > >  }
> > > >
> > > > Otherwise I have no objections to start a VOTE soonish. If others are
> > > > fine as well?
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 19.12.23 07:32, Alan Sheinberg wrote:
> > > > > Thanks for the helpful comments, Xuyang and Timo.
> > > > >
> > > > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take
> > kafka
> > > as
> > > > >> source and mysql as sink as an example.
> > > > >> Although kafka is an append-only source, one of its fields is used
> > as
> > > pk
> > > > >> when writing to mysql. If async udx is executed
> > > > >>   in an unordered mode, there may be problems with the data in
> mysql
> > > in
> > > > the
> > > > >> end. In this case, we need to ensure that
> > > > >> the sink-based pk is in order actually.
> > > > >
> > > > >
> > > > > @Xuyang: That's a great point.  If some node downstream of my
> > operator
> > > > > cares about ordering, there's no way for it to reconstruct the
> > original
> > > > > ordering of the rows as they were input to my operator.  So even if
> > > they
> > > > > want to preserve ordering by key, the order in which they see it
> may
> > > > > already be incorrect.  Somehow I thought that maybe the analysis of
> > the
> > > > > changelog mode at a given operator was aware of downstream
> > operations,
> > > > but
> > > > > it seems not.
> > > > >
> 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-20 Thread Alan Sheinberg
Thanks for that feedback Lincoln,

Only one question with the async `timeout` parameter[1](since I
> haven't seen the POC code), current description is: 'The time which can
> pass before a restart strategy is triggered',
> but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> timeout is the total time, do we keep the behavior of the parameter
> consistent?

That's a good catch.  I was intending to use *AsyncWaitOperator*, and to
pass this timeout directly.  Looking through the code a bit, it appears
that it doesn't restart the timer on a retry, and this timeout is total, as
you're saying.  I do intend on being consistent with the other FLIPs and
retaining this behavior, so I will update the wording on my FLIP to reflect
that.

-Alan

On Wed, Dec 20, 2023 at 1:36 AM Lincoln Lee  wrote:

> +1 for this useful feature!
> Hope this reply isn't too late. Agree that we start with global
> async-scalar configuration and ordered mode first.
>
> @Alan Only one question with the async `timeout` parameter[1](since I
> haven't seen the POC code), current description is: 'The time which can
> pass before a restart strategy is triggered',
> but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> timeout is the total time, do we keep the behavior of the parameter
> consistent?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
>
> Best,
> Lincoln Lee
>
>
> Alan Sheinberg  于2023年12月20日周三 08:41写道:
>
> > Thanks for the comments Timo.
> >
> >
> > > Can you remove the necessary parts? Esp.:
> >
> >  @Override
> > >  public Set getRequirements() {
> > >  return Collections.singleton(FunctionRequirement.ORDERED);
> > >  }
> >
> >
> > I removed this section from the FLIP since presumably, there's no use in
> > adding to the public API if it's ignored, with handling just ORDERED for
> > the first version.  I'm not sure how quickly I'll want to add UNORDERED
> > support, but I guess I can always do another FLIP.
> >
> > Otherwise I have no objections to start a VOTE soonish. If others are
> > > fine as well?
> >
> > That would be great.  Any areas that people are interested in discussing
> > further before a vote?
> >
> > -Alan
> >
> > On Tue, Dec 19, 2023 at 5:49 AM Timo Walther  wrote:
> >
> > >  > I would be totally fine with the first version only having ORDERED
> > >  > mode. For a v2, we could attempt to do the next most conservative
> > >  > thing
> > >
> > > Sounds good to me.
> > >
> > > I also cheked AsyncWaitOperator and could not find n access of
> > > StreamRecord's timestamp but only watermarks. But as we said, let's
> > > focus on ORDERED first.
> > >
> > > Can you remove the necessary parts? Esp.:
> > >
> > >  @Override
> > >  public Set getRequirements() {
> > >  return Collections.singleton(FunctionRequirement.ORDERED);
> > >  }
> > >
> > > Otherwise I have no objections to start a VOTE soonish. If others are
> > > fine as well?
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 19.12.23 07:32, Alan Sheinberg wrote:
> > > > Thanks for the helpful comments, Xuyang and Timo.
> > > >
> > > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take
> kafka
> > as
> > > >> source and mysql as sink as an example.
> > > >> Although kafka is an append-only source, one of its fields is used
> as
> > pk
> > > >> when writing to mysql. If async udx is executed
> > > >>   in an unordered mode, there may be problems with the data in mysql
> > in
> > > the
> > > >> end. In this case, we need to ensure that
> > > >> the sink-based pk is in order actually.
> > > >
> > > >
> > > > @Xuyang: That's a great point.  If some node downstream of my
> operator
> > > > cares about ordering, there's no way for it to reconstruct the
> original
> > > > ordering of the rows as they were input to my operator.  So even if
> > they
> > > > want to preserve ordering by key, the order in which they see it may
> > > > already be incorrect.  Somehow I thought that maybe the analysis of
> the
> > > > changelog mode at a given operator was aware of downstream
> operations,
> > > but
> > > > it seems not.
> > > >
> > > > Clear "no" on this. Changelog semantics make the planner complex and
> we
> > > >> need to be careful. Therefore I would strongly suggest we introduce
> > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for
> it
> > in
> > > >> plans with appropriate planner rules that guard it.
> > > >
> > > >
> > > > @Timo: The better I understand the complexity, the more I agree with
> > > this.
> > > > I would be totally fine with the first version only having ORDERED
> > mode.
> > > > For a v2, we could attempt to do the next 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-20 Thread Lincoln Lee
+1 for this useful feature!
Hope this reply isn't too late. Agree that we start with global
async-scalar configuration and ordered mode first.

@Alan Only one question with the async `timeout` parameter[1](since I
haven't seen the POC code), current description is: 'The time which can
pass before a restart strategy is triggered',
but in the previous flip-232[2] and flip-234[3], in retry scenario, this
timeout is the total time, do we keep the behavior of the parameter
consistent?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems

Best,
Lincoln Lee


Alan Sheinberg  于2023年12月20日周三 08:41写道:

> Thanks for the comments Timo.
>
>
> > Can you remove the necessary parts? Esp.:
>
>  @Override
> >  public Set getRequirements() {
> >  return Collections.singleton(FunctionRequirement.ORDERED);
> >  }
>
>
> I removed this section from the FLIP since presumably, there's no use in
> adding to the public API if it's ignored, with handling just ORDERED for
> the first version.  I'm not sure how quickly I'll want to add UNORDERED
> support, but I guess I can always do another FLIP.
>
> Otherwise I have no objections to start a VOTE soonish. If others are
> > fine as well?
>
> That would be great.  Any areas that people are interested in discussing
> further before a vote?
>
> -Alan
>
> On Tue, Dec 19, 2023 at 5:49 AM Timo Walther  wrote:
>
> >  > I would be totally fine with the first version only having ORDERED
> >  > mode. For a v2, we could attempt to do the next most conservative
> >  > thing
> >
> > Sounds good to me.
> >
> > I also cheked AsyncWaitOperator and could not find n access of
> > StreamRecord's timestamp but only watermarks. But as we said, let's
> > focus on ORDERED first.
> >
> > Can you remove the necessary parts? Esp.:
> >
> >  @Override
> >  public Set getRequirements() {
> >  return Collections.singleton(FunctionRequirement.ORDERED);
> >  }
> >
> > Otherwise I have no objections to start a VOTE soonish. If others are
> > fine as well?
> >
> > Regards,
> > Timo
> >
> >
> > On 19.12.23 07:32, Alan Sheinberg wrote:
> > > Thanks for the helpful comments, Xuyang and Timo.
> > >
> > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka
> as
> > >> source and mysql as sink as an example.
> > >> Although kafka is an append-only source, one of its fields is used as
> pk
> > >> when writing to mysql. If async udx is executed
> > >>   in an unordered mode, there may be problems with the data in mysql
> in
> > the
> > >> end. In this case, we need to ensure that
> > >> the sink-based pk is in order actually.
> > >
> > >
> > > @Xuyang: That's a great point.  If some node downstream of my operator
> > > cares about ordering, there's no way for it to reconstruct the original
> > > ordering of the rows as they were input to my operator.  So even if
> they
> > > want to preserve ordering by key, the order in which they see it may
> > > already be incorrect.  Somehow I thought that maybe the analysis of the
> > > changelog mode at a given operator was aware of downstream operations,
> > but
> > > it seems not.
> > >
> > > Clear "no" on this. Changelog semantics make the planner complex and we
> > >> need to be careful. Therefore I would strongly suggest we introduce
> > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for it
> in
> > >> plans with appropriate planner rules that guard it.
> > >
> > >
> > > @Timo: The better I understand the complexity, the more I agree with
> > this.
> > > I would be totally fine with the first version only having ORDERED
> mode.
> > > For a v2, we could attempt to do the next most conservative thing and
> > only
> > > allow UNORDERED when the whole graph is in *INSERT *changelog mode.
> The
> > > next best type of optimization might understand what's the key required
> > > downstream, and allow breaking the original order only between
> unrelated
> > > keys, but maintaining it between rows of the same key.  Of course if
> the
> > > key used downstream is computed in some manner, that makes it all the
> > > harder to know this beforehand.
> > >
> > > So unordering should be fine *within* watermarks. This is also what
> > >> watermarks are good for, a trade-off between strict ordering and
> making
> > >> progress. The async operator from DataStream API also supports this
> if I
> > >> remember correctly. However, it assumes a timestamp is present in
> > >> StreamRecord on which it can work. But this is not the case within the
> > >> SQL engine.
> > >
> > >
> > > *AsyncWaitOperator* and *UnorderedStreamElementQueue* (the
> > implementations
> > > I plan on using) seem to support exactly this behavior.  I don't 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-19 Thread Alan Sheinberg
Thanks for the comments Timo.


> Can you remove the necessary parts? Esp.:

 @Override
>  public Set getRequirements() {
>  return Collections.singleton(FunctionRequirement.ORDERED);
>  }


I removed this section from the FLIP since presumably, there's no use in
adding to the public API if it's ignored, with handling just ORDERED for
the first version.  I'm not sure how quickly I'll want to add UNORDERED
support, but I guess I can always do another FLIP.

Otherwise I have no objections to start a VOTE soonish. If others are
> fine as well?

That would be great.  Any areas that people are interested in discussing
further before a vote?

-Alan

On Tue, Dec 19, 2023 at 5:49 AM Timo Walther  wrote:

>  > I would be totally fine with the first version only having ORDERED
>  > mode. For a v2, we could attempt to do the next most conservative
>  > thing
>
> Sounds good to me.
>
> I also cheked AsyncWaitOperator and could not find n access of
> StreamRecord's timestamp but only watermarks. But as we said, let's
> focus on ORDERED first.
>
> Can you remove the necessary parts? Esp.:
>
>  @Override
>  public Set getRequirements() {
>  return Collections.singleton(FunctionRequirement.ORDERED);
>  }
>
> Otherwise I have no objections to start a VOTE soonish. If others are
> fine as well?
>
> Regards,
> Timo
>
>
> On 19.12.23 07:32, Alan Sheinberg wrote:
> > Thanks for the helpful comments, Xuyang and Timo.
> >
> > @Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as
> >> source and mysql as sink as an example.
> >> Although kafka is an append-only source, one of its fields is used as pk
> >> when writing to mysql. If async udx is executed
> >>   in an unordered mode, there may be problems with the data in mysql in
> the
> >> end. In this case, we need to ensure that
> >> the sink-based pk is in order actually.
> >
> >
> > @Xuyang: That's a great point.  If some node downstream of my operator
> > cares about ordering, there's no way for it to reconstruct the original
> > ordering of the rows as they were input to my operator.  So even if they
> > want to preserve ordering by key, the order in which they see it may
> > already be incorrect.  Somehow I thought that maybe the analysis of the
> > changelog mode at a given operator was aware of downstream operations,
> but
> > it seems not.
> >
> > Clear "no" on this. Changelog semantics make the planner complex and we
> >> need to be careful. Therefore I would strongly suggest we introduce
> >> ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
> >> plans with appropriate planner rules that guard it.
> >
> >
> > @Timo: The better I understand the complexity, the more I agree with
> this.
> > I would be totally fine with the first version only having ORDERED mode.
> > For a v2, we could attempt to do the next most conservative thing and
> only
> > allow UNORDERED when the whole graph is in *INSERT *changelog mode.  The
> > next best type of optimization might understand what's the key required
> > downstream, and allow breaking the original order only between unrelated
> > keys, but maintaining it between rows of the same key.  Of course if the
> > key used downstream is computed in some manner, that makes it all the
> > harder to know this beforehand.
> >
> > So unordering should be fine *within* watermarks. This is also what
> >> watermarks are good for, a trade-off between strict ordering and making
> >> progress. The async operator from DataStream API also supports this if I
> >> remember correctly. However, it assumes a timestamp is present in
> >> StreamRecord on which it can work. But this is not the case within the
> >> SQL engine.
> >
> >
> > *AsyncWaitOperator* and *UnorderedStreamElementQueue* (the
> implementations
> > I plan on using) seem to support exactly this behavior.  I don't think it
> > makes assumptions about the record's timestamp, but just preserves
> whatever
> > the input order is w.r.t watermarks.  I'd be curious to understand the
> > timestamp use in more detail and see if it's required with the mentioned
> > classes.
> >
> > TLDR: Let's focus on ORDERED first.
> >
> >
> > I'm more than happy to start here and we can consider UNORDERED as a
> > followup.  Then maybe we consider only INSERT mode graphs and ones where
> we
> > can solve the watermark constraints.
> >
> > Thanks,
> > Alan
> >
> >
> > On Mon, Dec 18, 2023 at 2:36 AM Timo Walther  wrote:
> >
> >> Hi Xuyang and Alan,
> >>
> >> thanks for this productive discussion.
> >>
> >>   > Would it make a difference if it were exposed by the explain
> >>
> >> @Alan: I think this is great idea. +1 on exposing the sync/async
> >> behavior thought EXPLAIN.
> >>
> >>
> >>   > Is there an easy way to determine if the output of an async function
> >>   > would be problematic or not?
> >>
> >> Clear "no" on this. Changelog semantics make the planner complex and we
> >> need to be careful. Therefore I would strongly 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-19 Thread Timo Walther

> I would be totally fine with the first version only having ORDERED
> mode. For a v2, we could attempt to do the next most conservative
> thing

Sounds good to me.

I also cheked AsyncWaitOperator and could not find n access of 
StreamRecord's timestamp but only watermarks. But as we said, let's 
focus on ORDERED first.


Can you remove the necessary parts? Esp.:

@Override
public Set getRequirements() {
return Collections.singleton(FunctionRequirement.ORDERED);
}

Otherwise I have no objections to start a VOTE soonish. If others are 
fine as well?


Regards,
Timo


On 19.12.23 07:32, Alan Sheinberg wrote:

Thanks for the helpful comments, Xuyang and Timo.

@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as

source and mysql as sink as an example.
Although kafka is an append-only source, one of its fields is used as pk
when writing to mysql. If async udx is executed
  in an unordered mode, there may be problems with the data in mysql in the
end. In this case, we need to ensure that
the sink-based pk is in order actually.



@Xuyang: That's a great point.  If some node downstream of my operator
cares about ordering, there's no way for it to reconstruct the original
ordering of the rows as they were input to my operator.  So even if they
want to preserve ordering by key, the order in which they see it may
already be incorrect.  Somehow I thought that maybe the analysis of the
changelog mode at a given operator was aware of downstream operations, but
it seems not.

Clear "no" on this. Changelog semantics make the planner complex and we

need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.



@Timo: The better I understand the complexity, the more I agree with this.
I would be totally fine with the first version only having ORDERED mode.
For a v2, we could attempt to do the next most conservative thing and only
allow UNORDERED when the whole graph is in *INSERT *changelog mode.  The
next best type of optimization might understand what's the key required
downstream, and allow breaking the original order only between unrelated
keys, but maintaining it between rows of the same key.  Of course if the
key used downstream is computed in some manner, that makes it all the
harder to know this beforehand.

So unordering should be fine *within* watermarks. This is also what

watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if I
remember correctly. However, it assumes a timestamp is present in
StreamRecord on which it can work. But this is not the case within the
SQL engine.



*AsyncWaitOperator* and *UnorderedStreamElementQueue* (the implementations
I plan on using) seem to support exactly this behavior.  I don't think it
makes assumptions about the record's timestamp, but just preserves whatever
the input order is w.r.t watermarks.  I'd be curious to understand the
timestamp use in more detail and see if it's required with the mentioned
classes.

TLDR: Let's focus on ORDERED first.


I'm more than happy to start here and we can consider UNORDERED as a
followup.  Then maybe we consider only INSERT mode graphs and ones where we
can solve the watermark constraints.

Thanks,
Alan


On Mon, Dec 18, 2023 at 2:36 AM Timo Walther  wrote:


Hi Xuyang and Alan,

thanks for this productive discussion.

  > Would it make a difference if it were exposed by the explain

@Alan: I think this is great idea. +1 on exposing the sync/async
behavior thought EXPLAIN.


  > Is there an easy way to determine if the output of an async function
  > would be problematic or not?

Clear "no" on this. Changelog semantics make the planner complex and we
need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.

  > If the input to the operator is append-only, it seems fine, because
  > this implies that each row is effectively independent and ordering is
  > unimportant.

As @Xuyang pointed out, it's not only the input that decides whether
append-only is safe. It's also the subsequent operators in the pipeline.
The example of Xuyang is a good one, when the sink operates in upsert
mode. Append-only source, append-only operators, and append-only sink
are safer.

However, even in this combination, a row is not fully "independent"
there are still watermarks flowing between rows:

R(5), W(4), R(3), R(4), R(2), R(1), W(0)

So unordering should be fine *within* watermarks. This is also what
watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if I
remember correctly. However, it assumes a timestamp is present in
StreamRecord on which it can work. But this is not the 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-18 Thread Alan Sheinberg
Thanks for the helpful comments, Xuyang and Timo.

@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as
> source and mysql as sink as an example.
> Although kafka is an append-only source, one of its fields is used as pk
> when writing to mysql. If async udx is executed
>  in an unordered mode, there may be problems with the data in mysql in the
> end. In this case, we need to ensure that
> the sink-based pk is in order actually.


@Xuyang: That's a great point.  If some node downstream of my operator
cares about ordering, there's no way for it to reconstruct the original
ordering of the rows as they were input to my operator.  So even if they
want to preserve ordering by key, the order in which they see it may
already be incorrect.  Somehow I thought that maybe the analysis of the
changelog mode at a given operator was aware of downstream operations, but
it seems not.

Clear "no" on this. Changelog semantics make the planner complex and we
> need to be careful. Therefore I would strongly suggest we introduce
> ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
> plans with appropriate planner rules that guard it.


@Timo: The better I understand the complexity, the more I agree with this.
I would be totally fine with the first version only having ORDERED mode.
For a v2, we could attempt to do the next most conservative thing and only
allow UNORDERED when the whole graph is in *INSERT *changelog mode.  The
next best type of optimization might understand what's the key required
downstream, and allow breaking the original order only between unrelated
keys, but maintaining it between rows of the same key.  Of course if the
key used downstream is computed in some manner, that makes it all the
harder to know this beforehand.

So unordering should be fine *within* watermarks. This is also what
> watermarks are good for, a trade-off between strict ordering and making
> progress. The async operator from DataStream API also supports this if I
> remember correctly. However, it assumes a timestamp is present in
> StreamRecord on which it can work. But this is not the case within the
> SQL engine.


*AsyncWaitOperator* and *UnorderedStreamElementQueue* (the implementations
I plan on using) seem to support exactly this behavior.  I don't think it
makes assumptions about the record's timestamp, but just preserves whatever
the input order is w.r.t watermarks.  I'd be curious to understand the
timestamp use in more detail and see if it's required with the mentioned
classes.

TLDR: Let's focus on ORDERED first.


I'm more than happy to start here and we can consider UNORDERED as a
followup.  Then maybe we consider only INSERT mode graphs and ones where we
can solve the watermark constraints.

Thanks,
Alan


On Mon, Dec 18, 2023 at 2:36 AM Timo Walther  wrote:

> Hi Xuyang and Alan,
>
> thanks for this productive discussion.
>
>  > Would it make a difference if it were exposed by the explain
>
> @Alan: I think this is great idea. +1 on exposing the sync/async
> behavior thought EXPLAIN.
>
>
>  > Is there an easy way to determine if the output of an async function
>  > would be problematic or not?
>
> Clear "no" on this. Changelog semantics make the planner complex and we
> need to be careful. Therefore I would strongly suggest we introduce
> ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
> plans with appropriate planner rules that guard it.
>
>  > If the input to the operator is append-only, it seems fine, because
>  > this implies that each row is effectively independent and ordering is
>  > unimportant.
>
> As @Xuyang pointed out, it's not only the input that decides whether
> append-only is safe. It's also the subsequent operators in the pipeline.
> The example of Xuyang is a good one, when the sink operates in upsert
> mode. Append-only source, append-only operators, and append-only sink
> are safer.
>
> However, even in this combination, a row is not fully "independent"
> there are still watermarks flowing between rows:
>
> R(5), W(4), R(3), R(4), R(2), R(1), W(0)
>
> So unordering should be fine *within* watermarks. This is also what
> watermarks are good for, a trade-off between strict ordering and making
> progress. The async operator from DataStream API also supports this if I
> remember correctly. However, it assumes a timestamp is present in
> StreamRecord on which it can work. But this is not the case within the
> SQL engine.
>
> TLDR: Let's focus on ORDERED first.
>
> If we want to use UNORDERED, I would suggest to check the input operator
> for exactly 1 time attribute column. If there is exactly 1 time
> attribute column, we could insert it into the StreamRecord and allow
> UNORDERED mode. If this condition is not met, we go with ORDERED.
>
> Regards,
> Timo
>
>
>
>
> On 18.12.23 07:05, Xuyang wrote:
> > Hi, Alan and Timo. Thanks for your reply.
> >> Would it make a difference if it were exposed by the explain
> >> method (the operator 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-18 Thread Timo Walther

Hi Xuyang and Alan,

thanks for this productive discussion.

> Would it make a difference if it were exposed by the explain

@Alan: I think this is great idea. +1 on exposing the sync/async 
behavior thought EXPLAIN.



> Is there an easy way to determine if the output of an async function
> would be problematic or not?

Clear "no" on this. Changelog semantics make the planner complex and we 
need to be careful. Therefore I would strongly suggest we introduce 
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in 
plans with appropriate planner rules that guard it.


> If the input to the operator is append-only, it seems fine, because
> this implies that each row is effectively independent and ordering is
> unimportant.

As @Xuyang pointed out, it's not only the input that decides whether 
append-only is safe. It's also the subsequent operators in the pipeline. 
The example of Xuyang is a good one, when the sink operates in upsert 
mode. Append-only source, append-only operators, and append-only sink 
are safer.


However, even in this combination, a row is not fully "independent" 
there are still watermarks flowing between rows:


R(5), W(4), R(3), R(4), R(2), R(1), W(0)

So unordering should be fine *within* watermarks. This is also what 
watermarks are good for, a trade-off between strict ordering and making 
progress. The async operator from DataStream API also supports this if I 
remember correctly. However, it assumes a timestamp is present in 
StreamRecord on which it can work. But this is not the case within the 
SQL engine.


TLDR: Let's focus on ORDERED first.

If we want to use UNORDERED, I would suggest to check the input operator 
for exactly 1 time attribute column. If there is exactly 1 time 
attribute column, we could insert it into the StreamRecord and allow 
UNORDERED mode. If this condition is not met, we go with ORDERED.


Regards,
Timo




On 18.12.23 07:05, Xuyang wrote:

Hi, Alan and Timo. Thanks for your reply.

Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?

@Alan: I think this is a good way to tell the user what mode these async udx 
are currently in.

A regular SQL user doesn't care whether the function is sync or async.

@Timo: I agree that the planner should throw as few exceptions as possible 
rather than confusing users. So I think
it is a good way to expose syncMode through explain syntax.

If the input to the operator is append-only, it seems fine,
because this implies that each row is effectively independent and ordering is 
unimportant.




For example, if the query is > an append-only `SELECT FUNC(c) FROM t`,
I don't see a reason why the > operator is not allowed to produce unordered 
results.



@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as 
source and mysql as sink as an example.
Although kafka is an append-only source, one of its fields is used as pk when 
writing to mysql. If async udx is executed
  in an unordered mode, there may be problems with the data in mysql in the 
end. In this case, we need to ensure that
the sink-based pk is in order actually.



--

 Best!
 Xuyang





At 2023-12-16 03:33:55, "Alan Sheinberg"  
wrote:

Thanks for the replies everyone.  My responses are inline:

About the configs, what do you think using hints as mentioned in [1].

@Aitozi: I think hints could be a good way to do this, similar to lookup
joins or the proposal in FLIP-313.  One benefit of hints is that they allow
for the highest granularity of configuration because you can decide at
each and every call site just what parameters to use.  The downside of
hints is that there's more syntax to learn and more verbosity.  I'm
somewhat partial to a configuration like this with a class definition level
of granularity (similar to how metrics reporters are defined [1]):

table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
table.exec.async-scalar.myfunc.buffer-capacity: 10
...

As Timo mentioned, the downside to this is that there's not a nice static
way to do this at the moment, unless you extend ConfigOption.  It would be
good ultimately if Lookup joins, async scalar functions, and other future
configurable UDFs shared the same methodology, but maybe a unified approach
is a followup discussion.

I’m just curious why you don’t use conf(global) and query hint(individual

async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
chose to introduce a new enum
in AsyncScalarFunction.



@Xuyang: I'm open to adding hints. I think the important part is that we
have some method for the user to have a class definition level way to
define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
have a strong sense yet for what would be most appropriately exposed as a
FunctionRequirement vs a simple configuration/hint.

What about throwing an exception to make it clear to users that using async


Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-15 Thread Alan Sheinberg
Thanks for the replies everyone.  My responses are inline:

About the configs, what do you think using hints as mentioned in [1].

@Aitozi: I think hints could be a good way to do this, similar to lookup
joins or the proposal in FLIP-313.  One benefit of hints is that they allow
for the highest granularity of configuration because you can decide at
each and every call site just what parameters to use.  The downside of
hints is that there's more syntax to learn and more verbosity.  I'm
somewhat partial to a configuration like this with a class definition level
of granularity (similar to how metrics reporters are defined [1]):

table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
table.exec.async-scalar.myfunc.buffer-capacity: 10
...

As Timo mentioned, the downside to this is that there's not a nice static
way to do this at the moment, unless you extend ConfigOption.  It would be
good ultimately if Lookup joins, async scalar functions, and other future
configurable UDFs shared the same methodology, but maybe a unified approach
is a followup discussion.

I’m just curious why you don’t use conf(global) and query hint(individual
> async udx) to mark the output
> mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
> chose to introduce a new enum
> in AsyncScalarFunction.


@Xuyang: I'm open to adding hints. I think the important part is that we
have some method for the user to have a class definition level way to
define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
have a strong sense yet for what would be most appropriately exposed as a
FunctionRequirement vs a simple configuration/hint.

What about throwing an exception to make it clear to users that using async
> scalar functions in this situation
> is problematic instead of executing silently in sync mode? Because users
> may be confused about
> the final actual job graph.


@Xuyang: Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?  I'd be fine to do it
either way -- certainly throwing an error is a bit simpler.

You are right. Actually it should be the planner that fully decides
> whether ORDERED or UNORDERED is safe to do. For example, if the query is
> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
> operator is not allowed to produce unordered results. By global
> configuration, we can set ORDERED such that users don't get confused
> about the unordered output.


@Timo: Is there an easy way to determine if the output of an async function
would be problematic or not?  If the input to the operator is append-only,
it seems fine, because this implies that each row is effectively
independent and ordering is unimportant. For upsert mode with +U rows, you
wouldn't want to swap order with other +U rows for the same key because the
last one should win.  For -D or -U rows, you wouldn't want to swap with
other rows for the same key for similar reasons.  Is it as simple as
looking for the changlelog mode to determine whether it's safe to run async
functions UNORDERED?  I had considered analyzing various query forms (join
vs aggregation vs whatever), but it seems like changelog mode could be
sufficient to understand what works and what would be an issue.  Any code
pointers and explanation for similar analysis would be great to understand
this more.

The mode UNORDERED however should only have
> effect for these simply use cases and throw an exception if UNORDERED
> would mess up a changelog or other subsequent operators.

@Timo: Should we throw errors or run in sync mode?  It seems like running
in sync mode is an option to ensure correctness in all changelog modes.

Let's go with global configuration first and later introduce
> hints. I feel the more hints we introduce, the harder SQL queries get
> when maintaining them.

@Timo: That seems like a reasonable approach to me.

-Alan

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/

On Fri, Dec 15, 2023 at 2:56 AM Timo Walther  wrote:

> 1. Override the function `getRequirements` in `AsyncScalarFunction`
>
>  > If the user overrides `requirements()` to omit the `ORDERED`
>  > requirement, do we allow the operator to return out-of-order results
>  > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
>  > behavior (where we allow out-of-order only if it's deemed correct)?
>
> You are right. Actually it should be the planner that fully decides
> whether ORDERED or UNORDERED is safe to do. For example, if the query is
> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
> operator is not allowed to produce unordered results. By global
> configuration, we can set ORDERED such that users don't get confused
> about the unordered output. The mode UNORDERED however should only have
> effect for these simply use cases and throw an exception if UNORDERED
> would mess up a changelog or other subsequent 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-15 Thread Timo Walther

1. Override the function `getRequirements` in `AsyncScalarFunction`

> If the user overrides `requirements()` to omit the `ORDERED`
> requirement, do we allow the operator to return out-of-order results
> or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides 
whether ORDERED or UNORDERED is safe to do. For example, if the query is 
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the 
operator is not allowed to produce unordered results. By global 
configuration, we can set ORDERED such that users don't get confused 
about the unordered output. The mode UNORDERED however should only have 
effect for these simply use cases and throw an exception if UNORDERED 
would mess up a changelog or other subsequent operators.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


> What about throwing an exception to make it clear to users that using 
async scalar functions


@Xuyang: A regular SQL user doesn't care whether the function is sync or 
async. The planner should simply give its best to make the execution 
performant. I would not throw an exception here. There more exceptions 
the, the more struggles and questions from the user. Conceptually, we 
can run async code also sync, and that's why we should also do it to 
avoid errors.


3. Hints

@Aitozi: Let's go with global configuration first and later introduce 
hints. I feel the more hints we introduce, the harder SQL queries get 
when maintaining them.


Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:

Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the 
improvement
effect is obvious, so I think it makes sense to support async scalar function.  
Big +1 for this flip.
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query hint(individual async 
udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but chose 
to introduce a new enum
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


What about throwing an exception to make it clear to users that using async 
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users may be 
confused about
the final actual job graph.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

 Best!
 Xuyang





在 2023-12-15 11:20:24,"Aitozi"  写道:

Hi Alan,
Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther  于2023年12月14日周四 17:29写道:


Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.

Great that we can generalize and reuse parts of the Python planner rules
and code for this.

I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.

I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements. I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
Set` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the user
intents to do this.


Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:

+1 to the idea, I don't have any 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Aitozi
Hi Alan,
Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther  于2023年12月14日周四 17:29写道:

> Hi Alan,
>
> thanks for proposing this FLIP. It's a great addition to Flink and has
> been requested multiple times. It will be in particular interesting for
> accessing REST endpoints and other remote services.
>
> Great that we can generalize and reuse parts of the Python planner rules
> and code for this.
>
> I have some feedback regarding the API:
>
> 1) Configuration
>
> Configuration keys like
>
> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>
> are currently not supported in the configuration stack. The key space
> should remain constant. Only a constant key space enables the use of the
> ConfigOption class which is required in the layered configuration. For
> now I would suggest to only allow a global setting for buffer capacity,
> timeout, and retry-strategy. We can later work on a per-function
> configuration (potentially also needed for other use cases).
>
> 2) Semantical declaration
>
> Regarding
>
> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>
> this is a semantical property of a function and should be defined
> per-function. It impacts the query result and potentially the behavior
> of planner rules.
>
> I see two options for this either: (a) an additional method in
> AsyncScalarFunction or (b) adding this to the function's requirements. I
> vote for (b), because a FunctionDefinition should be fully self
> contained and sufficient for planning.
>
> Thus, for `FunctionDefinition.getRequirements():
> Set` we can add a new requirement `ORDERED` which
> should also be the default for AsyncScalarFunction. `getRequirements()`
> can be overwritten and return a set without this requirement if the user
> intents to do this.
>
>
> Thanks,
> Timo
>
>
>
>
> On 11.12.23 18:43, Piotr Nowojski wrote:
> > +1 to the idea, I don't have any comments.
> >
> > Best,
> > Piotrek
> >
> > czw., 7 gru 2023 o 07:15 Alan Sheinberg  .invalid>
> > napisał(a):
> >
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> >>> imply/suggest that all Async functions are remote.  I wonder if we can
> >> find
> >>> another name which doesn't carry that connotation; maybe
> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> Python
> >>> and Async functions seems reasonable.)
> >>>
> >> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
> >> believe that the python calls are also done asynchronously, so that
> might
> >> be a reasonable name, so long as there's no confusion between the base
> and
> >> async child class.
> >>
> >> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes  >
> >> wrote:
> >>
> >>> Hi Alan,
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> >>> imply/suggest that all Async functions are remote.  I wonder if we can
> >> find
> >>> another name which doesn't carry that connotation; maybe
> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> Python
> >>> and Async functions seems reasonable.)
> >>>
> >>> Cheers,
> >>>
> >>> Jim
> >>>
> >>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> >>>  wrote:
> >>>
>  I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
>  asynchronous scalar function support [1]
> 
>  This feature proposes adding a new UDF type AsyncScalarFunction which
> >> is
>  invoked just like a normal ScalarFunction, but is implemented with an
>  asynchronous eval method.  I had brought this up including the
> >> motivation
>  in a previous discussion thread [2].
> 
>  The purpose is to achieve high throughput scalar function UDFs while
>  allowing that an individual call may have high latency.  It allows
> >>> scaling
>  up the parallelism of just these calls without having to increase the
>  parallelism of the whole query (which could be rather resource
>  inefficient).
> 
>  In practice, it should enable SQL integration with external services
> >> and
>  systems, which Flink has limited support for at the moment. It should
> >>> also
>  allow easier integration with existing libraries which use
> asynchronous
>  APIs.
> 
>  Looking forward to your feedback and suggestions.
> 
>  [1]
> 
> 
> >>>
> >>
> 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Alan Sheinberg
Thanks Piotr and Timo for your responses.

To address your comments Timo:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
> should remain constant. Only a constant key space enables the use of the
> ConfigOption class which is required in the layered configuration. For
> now I would suggest to only allow a global setting for buffer capacity,
> timeout, and retry-strategy. We can later work on a per-function
> configuration (potentially also needed for other use cases).


Yeah, I was trying to find similar examples and couldn't find too many,
because as you say, they aren't supported.

There are things like the metric reporters
,
where you can make up a name (e.g. my_jmx_reporter), but then must list it
in metrics.reporters so that configs can all be iterated over.  Doing a
similar thing here would be a bit inelegant for this case.  I'm happy to
have a global setting and a future solution could override the global
setting once we figure that out.

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
> per-function. It impacts the query result and potentially the behavior
> of planner rules.

I see two options for this either: (a) an additional method in
> AsyncScalarFunction or (b) adding this to the function's requirements. I
> vote for (b), because a FunctionDefinition should be fully self
> contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
> Set` we can add a new requirement `ORDERED` which
> should also be the default for AsyncScalarFunction. `getRequirements()`
> can be overwritten and return a set without this requirement if the user
> intents to do this.


That's a good point. Maybe if we had per-function configs this could make
sense, but it doesn't make as much when everything is global. The default
of each definition should be to get a correct result, but allowing a manual
override to say that performance is ultimately what they care about over
certain SQL order semantics is also useful.  If the user overrides
`requirements()` to omit the `ORDERED` requirement, do we allow the
operator to return out-of-order results or should it fall back on
`AsyncOutputMode.ALLOW_UNORDERED` type behavior (where we allow
out-of-order only if it's deemed correct)? Having a manual override to mean
out-of-order seems like a decent starting point and we could alway add
`FunctionRequirement.ALLOW_UNDERED` in the future to allow the more
sophisticated behavior.

Thanks,
Alan

On Thu, Dec 14, 2023 at 1:29 AM Timo Walther  wrote:

> Hi Alan,
>
> thanks for proposing this FLIP. It's a great addition to Flink and has
> been requested multiple times. It will be in particular interesting for
> accessing REST endpoints and other remote services.
>
> Great that we can generalize and reuse parts of the Python planner rules
> and code for this.
>
> I have some feedback regarding the API:
>
> 1) Configuration
>
> Configuration keys like
>
> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>
> are currently not supported in the configuration stack. The key space
> should remain constant. Only a constant key space enables the use of the
> ConfigOption class which is required in the layered configuration. For
> now I would suggest to only allow a global setting for buffer capacity,
> timeout, and retry-strategy. We can later work on a per-function
> configuration (potentially also needed for other use cases).
>
> 2) Semantical declaration
>
> Regarding
>
> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>
> this is a semantical property of a function and should be defined
> per-function. It impacts the query result and potentially the behavior
> of planner rules.
>
> I see two options for this either: (a) an additional method in
> AsyncScalarFunction or (b) adding this to the function's requirements. I
> vote for (b), because a FunctionDefinition should be fully self
> contained and sufficient for planning.
>
> Thus, for `FunctionDefinition.getRequirements():
> Set` we can add a new requirement `ORDERED` which
> should also be the default for AsyncScalarFunction. `getRequirements()`
> can be overwritten and return a set without this requirement if the user
> intents to do this.
>
>
> Thanks,
> Timo
>
>
>
>
> On 11.12.23 18:43, Piotr Nowojski wrote:
> > +1 to the idea, I don't have any comments.
> >
> > Best,
> > Piotrek
> >
> > czw., 7 gru 2023 o 07:15 Alan Sheinberg  .invalid>
> > napisał(a):
> >
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Timo Walther

Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has 
been requested multiple times. It will be in particular interesting for 
accessing REST endpoints and other remote services.


Great that we can generalize and reuse parts of the Python planner rules 
and code for this.


I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space 
should remain constant. Only a constant key space enables the use of the 
ConfigOption class which is required in the layered configuration. For 
now I would suggest to only allow a global setting for buffer capacity, 
timeout, and retry-strategy. We can later work on a per-function 
configuration (potentially also needed for other use cases).


2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined 
per-function. It impacts the query result and potentially the behavior 
of planner rules.


I see two options for this either: (a) an additional method in 
AsyncScalarFunction or (b) adding this to the function's requirements. I 
vote for (b), because a FunctionDefinition should be fully self 
contained and sufficient for planning.


Thus, for `FunctionDefinition.getRequirements(): 
Set` we can add a new requirement `ORDERED` which 
should also be the default for AsyncScalarFunction. `getRequirements()` 
can be overwritten and return a set without this requirement if the user 
intents to do this.



Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:

+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg 
napisał(a):



Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can

find

another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)


Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
believe that the python calls are also done asynchronously, so that might
be a reasonable name, so long as there's no confusion between the base and
async child class.

On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes 
wrote:


Hi Alan,

Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can

find

another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)

Cheers,

Jim

On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
 wrote:


I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]

This feature proposes adding a new UDF type AsyncScalarFunction which

is

invoked just like a normal ScalarFunction, but is implemented with an
asynchronous eval method.  I had brought this up including the

motivation

in a previous discussion thread [2].

The purpose is to achieve high throughput scalar function UDFs while
allowing that an individual call may have high latency.  It allows

scaling

up the parallelism of just these calls without having to increase the
parallelism of the whole query (which could be rather resource
inefficient).

In practice, it should enable SQL integration with external services

and

systems, which Flink has limited support for at the moment. It should

also

allow easier integration with existing libraries which use asynchronous
APIs.

Looking forward to your feedback and suggestions.

[1]





https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support

<




https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support




[2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs


Thanks,
Alan











Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-11 Thread Piotr Nowojski
+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg 
napisał(a):

> >
> > Nicely written and makes sense.  The only feedback I have is around the
> > naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
> > will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
> > imply/suggest that all Async functions are remote.  I wonder if we can
> find
> > another name which doesn't carry that connotation; maybe
> > AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
> > and Async functions seems reasonable.)
> >
> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
> believe that the python calls are also done asynchronously, so that might
> be a reasonable name, so long as there's no confusion between the base and
> async child class.
>
> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes 
> wrote:
>
> > Hi Alan,
> >
> > Nicely written and makes sense.  The only feedback I have is around the
> > naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
> > will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
> > imply/suggest that all Async functions are remote.  I wonder if we can
> find
> > another name which doesn't carry that connotation; maybe
> > AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
> > and Async functions seems reasonable.)
> >
> > Cheers,
> >
> > Jim
> >
> > On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> >  wrote:
> >
> > > I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
> > > asynchronous scalar function support [1]
> > >
> > > This feature proposes adding a new UDF type AsyncScalarFunction which
> is
> > > invoked just like a normal ScalarFunction, but is implemented with an
> > > asynchronous eval method.  I had brought this up including the
> motivation
> > > in a previous discussion thread [2].
> > >
> > > The purpose is to achieve high throughput scalar function UDFs while
> > > allowing that an individual call may have high latency.  It allows
> > scaling
> > > up the parallelism of just these calls without having to increase the
> > > parallelism of the whole query (which could be rather resource
> > > inefficient).
> > >
> > > In practice, it should enable SQL integration with external services
> and
> > > systems, which Flink has limited support for at the moment. It should
> > also
> > > allow easier integration with existing libraries which use asynchronous
> > > APIs.
> > >
> > > Looking forward to your feedback and suggestions.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > >
> > >
> > > [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> > > 
> > >
> > > Thanks,
> > > Alan
> > >
> >
>


Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-06 Thread Alan Sheinberg
>
> Nicely written and makes sense.  The only feedback I have is around the
> naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
> will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
> imply/suggest that all Async functions are remote.  I wonder if we can find
> another name which doesn't carry that connotation; maybe
> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
> and Async functions seems reasonable.)
>
Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
believe that the python calls are also done asynchronously, so that might
be a reasonable name, so long as there's no confusion between the base and
async child class.

On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes 
wrote:

> Hi Alan,
>
> Nicely written and makes sense.  The only feedback I have is around the
> naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
> will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
> imply/suggest that all Async functions are remote.  I wonder if we can find
> another name which doesn't carry that connotation; maybe
> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
> and Async functions seems reasonable.)
>
> Cheers,
>
> Jim
>
> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
>  wrote:
>
> > I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
> > asynchronous scalar function support [1]
> >
> > This feature proposes adding a new UDF type AsyncScalarFunction which is
> > invoked just like a normal ScalarFunction, but is implemented with an
> > asynchronous eval method.  I had brought this up including the motivation
> > in a previous discussion thread [2].
> >
> > The purpose is to achieve high throughput scalar function UDFs while
> > allowing that an individual call may have high latency.  It allows
> scaling
> > up the parallelism of just these calls without having to increase the
> > parallelism of the whole query (which could be rather resource
> > inefficient).
> >
> > In practice, it should enable SQL integration with external services and
> > systems, which Flink has limited support for at the moment. It should
> also
> > allow easier integration with existing libraries which use asynchronous
> > APIs.
> >
> > Looking forward to your feedback and suggestions.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > >
> >
> > [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> > 
> >
> > Thanks,
> > Alan
> >
>


Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-06 Thread Jim Hughes
Hi Alan,

Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)

Cheers,

Jim

On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
 wrote:

> I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
> asynchronous scalar function support [1]
>
> This feature proposes adding a new UDF type AsyncScalarFunction which is
> invoked just like a normal ScalarFunction, but is implemented with an
> asynchronous eval method.  I had brought this up including the motivation
> in a previous discussion thread [2].
>
> The purpose is to achieve high throughput scalar function UDFs while
> allowing that an individual call may have high latency.  It allows scaling
> up the parallelism of just these calls without having to increase the
> parallelism of the whole query (which could be rather resource
> inefficient).
>
> In practice, it should enable SQL integration with external services and
> systems, which Flink has limited support for at the moment. It should also
> allow easier integration with existing libraries which use asynchronous
> APIs.
>
> Looking forward to your feedback and suggestions.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> >
>
> [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> 
>
> Thanks,
> Alan
>