Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread Kurt Young
+1 from my side.

Best,
Kurt


On Tue, Nov 30, 2021 at 5:12 PM Jingsong Li  wrote:

> Hi everyone,
>
> Many thanks to Stephan and Timo, this makes the design of FLIP much
> clearer and more reliable.
>
> I request that you can take another look at the updated FLIP and
> please respond directly if you have feedback.
>
> (I will contact binding voters directly to confirm)
>
> Best,
> Jingsong
>
> On Tue, Nov 30, 2021 at 4:32 PM Timo Walther  wrote:
> >
> > Thanks for the healthy discussion. Also +1 from my side for this FLIP.
> >
> > Thanks,
> > Timo
> >
> > On 24.11.21 19:05, Stephan Ewen wrote:
> > > Thanks for all the details and explanation.
> > >
> > > With the conclusion of the discussion, also +1 from my side for this
> FLIP
> > >
> > > On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li 
> wrote:
> > >
> > >> Thanks Stephan and Timo, I have a rough look at your replies. They are
> > >> all valuable opinions. I will take time to discuss, explain and
> > >> improve them.
> > >>
> > >> Hi Timo,
> > >>> At least a final "I will start the vote soon. Last call for
> comments."
> > >> would have been nice.
> > >>
> > >> I replied in the DISCUSS thread that we began to vote. If there are
> > >> supplementary comments or reply "pause voting first, I will reply
> > >> later", we can suspend or cancel the voting at any time.
> > >> I understand why the FLIP must take three days to vote, so that more
> > >> people can see it and put forward their opinions.
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Sat, Nov 13, 2021 at 1:27 AM Timo Walther 
> wrote:
> > >>>
> > >>> Hi everyone,
> > >>>
> > >>> even though the DISCUSS thread was open for 2 weeks. I have the
> feeling
> > >>> that the VOTE was initiated to quickly. At least a final "I will
> start
> > >>> the vote soon. Last call for comments." would have been nice.
> > >>>
> > >>> I also added some comments in the DISCUSS thread. Let's hope we can
> > >>> resolve those soon.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>> On 12.11.21 16:36, Stephan Ewen wrote:
> >  Hi all!
> > 
> >  I have a few questions on the design still, posted those in the
> > >> [DISCUSS]
> >  thread.
> >  It would be great to clarify those first before concluding this
> vote.
> > 
> >  Thanks,
> >  Stephan
> > 
> > 
> >  On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:
> > 
> > > +1 (binding)
> > >
> > > Thanks for the great work Jingsong!
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 11 Nov 2021 at 19:41, JING ZHANG 
> > >> wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> A small suggestion:
> > >> The message queue is currently used to store middle layer data of
> the
> > >> streaming data warehouse. We hope use built-in dynamic table
> storage
> > >> to
> > >> store those middle layer.
> > >> But those middle data of the streaming data warehouse are often
> > >> provided
> > > to
> > >> all business teams in a company. Some teams have not use Apache
> > >> Flink as
> > >> compute engine yet. In order to continue server those teams, the
> > >> data in
> > >> built-in dynamic table storage may be needed to copied to message
> > >> queue
> > >> again.
> > >> If *the built-in storage could provide same consumer API as the
> > >> commonly
> > >> used message queues*, data copying may be avoided. So the built-in
> > > dynamic
> > >> table storage may be promoted faster in the streaming data
> warehouse
> > >> business.
> > >>
> > >> Best regards,
> > >> Jing Zhang
> > >>
> > >> Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> +1 (non-binding)
> > >>>
> > >>> Very interesting design. I saw a lot of discussion on the generic
> > >>> interface design, good to know it will address extensibility.
> > >>>
> > >>> Cheers,
> > >>> Yufei
> > >>>
> > >>>
> > >>> On 2021/11/10 02:51:55 Jingsong Li wrote:
> >  Hi everyone,
> > 
> >  Thanks for all the feedback so far. Based on the discussion[1]
> we
> > > seem
> >  to have consensus, so I would like to start a vote on FLIP-188
> for
> >  which the FLIP has now also been updated[2].
> > 
> >  The vote will last for at least 72 hours (Nov 13th 3:00 GMT)
> unless
> >  there is an objection or insufficient votes.
> > 
> >  [1]
> > >> https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
> >  [2]
> > >>>
> > >>
> > >
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > 
> >  Best,
> >  Jingsong
> > 
> > >>>
> > >>
> > >
> > 
> > >>>
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-29 Thread Kurt Young
Hi,

I didn't fully read the FLIP but the name somehow confused me. My first
impression of
seeing this is we are providing some storage for job execution results,
like the one
returned with accumulators in batch mode. Would a name like
"JobStautsStore" be more
appropriate?

Best,
Kurt


On Mon, Nov 29, 2021 at 8:22 PM Zhu Zhu  wrote:

> Thanks for drafting this FLIP, Matthias, Mika and David.
>
> I like the proposed JobResultStore. Besides addressing the problem of
> re-executing finished jobs, it's also an important step towards HA of
> multi-job Flink applications.
>
> I have one question that, in the "Cleanup" section, it shows that the
> JobMaster is responsible for cleaning up CheckpointCounter/CheckpointStore.
> Does this mean Flink will have to re-create
> JobMaster/Scheduler/ExecutionGraph for a terminated job to do the cleanup?
> If so, this can be heavy in certain cases because the ExecutionGraph
> creation may conduct connector initialization. So I'm thinking whether it's
> possible to make CheckpointCounter/CheckpointStore a component of
> Dispatcher?
>
> Thanks,
> Zhu
>
> Till Rohrmann  于2021年11月27日周六 上午1:29写道:
>
> > Thanks for creating this FLIP Matthias, Mika and David.
> >
> > I think the JobResultStore is an important piece for fixing Flink's last
> > high-availability problem (afaik). Once we have this piece in place,
> users
> > no longer risk to re-execute a successfully completed job.
> >
> > I have one comment concerning breaking interfaces:
> >
> > If we don't want to break interfaces, then we could keep the
> > HighAvailabilityServices.getRunningJobsRegistry() method and add a
> default
> > implementation for HighAvailabilityServices.getJobResultStore(). We could
> > then deprecate the former method and then remove it in the subsequent
> > release (1.16).
> >
> > Apart from that, +1 for the FLIP.
> >
> > Cheers,
> > Till
> >
> > On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:
> >
> > > Hi everyone,
> > >
> > > Matthias, Mika and I want to start a discussion about introduction of a
> > new
> > > Flink component, the *JobResultStore*.
> > >
> > > The main motivation is to address shortcomings of the
> > *RunningJobsRegistry*
> > > and surpass it with the new component. These shortcomings have been
> first
> > > described in FLINK-11813 [1].
> > >
> > > This change should improve the overall stability of the JobManager's
> > > components and address the race conditions in some of the fail over
> > > scenarios during the job cleanup lifecycle.
> > >
> > > It should also help to ensure that Flink doesn't leave any uncleaned
> > > resources behind.
> > >
> > > We've prepared a FLIP-194 [2], which outlines the design and reasoning
> > > behind this new component.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11813
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
> > >
> > > We're looking forward for your feedback ;)
> > >
> > > Best,
> > > Matthias, Mika and David
> > >
> >
>


Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-22 Thread Kurt Young
> This is what I don't really understand here: how adding a configuration
option causes issues here?
This is why: for most Flink production use cases I see, it's not like a
couple of people manage ~5 Flink
jobs, so they can easily track all the big changes in every minor Flink
version. Typically use case are like
a group of people managing some streaming platform, which will provide
Flink as an execution engine
to their users. Alibaba has more than 40K online streaming SQL jobs, and
ByteDance also has a similar
number. Most of the time, whether upgrading Flink version will be
controlled by the user of the platform,
not the platform provider. The platform will most likely provide multiple
Flink version support.

Even if you can count on the platform provider to read all the release
notes carefully, their users won't. So
we are kind of throw the responsibility to all the platform provider, make
them to take care of the semantic
changes. They have to find some good way to control the impactions when
their users upgrade Flink's version.
And if they don't find a good solution around this, and their users
encounter some online issues, they will be
blamed. And you can guess who they would blame.

Flink is a very popular engine now, every decision we make will affect the
users a lot. If you want them to make
some changes, I would argue we should make them think it's worth it.

Best,
Kurt


On Mon, Nov 22, 2021 at 11:29 PM Francesco Guardiani <
france...@ververica.com> wrote:

> > NULL in SQL essentially means "UNKNOWN", it's not as scary as a null in
> java which will easily cause a NPE or some random behavior with a c++
> function call.
>
> This is true from the user point of view, except our runtime doesn't treat
> null as some value where you can safely execute operations and get "noop"
> results. In our runtime null is Java's null, hence causing issues and
> generating NPEs here and there when nulls are not expected.
>
> > It will really create a big mess after users upgrade their SQL jobs
>
> This is what I don't really understand here: how adding a configuration
> option causes issues here? We make it very clear in our release notes that
> you need to switch that flag if you're relying on this behavior and that's
> it: if you reprocess jobs every time you upgrade, you just flip the switch
> before reprocessing and you won't have any issues. If you don't because you
> use the hybrid source, either you upgrade your query or you flip the flag
> and in both cases this shouldn't generate any issue.
> Since it's a big change, I also expect to keep this flag for some releases,
> at least up to Flink 2.
>
> On Sat, Nov 20, 2021 at 7:25 AM Kurt Young  wrote:
>
> > Hi Francesco,
> >
> > Thanks for sharing your opinion about this and examples with other
> > programming
> > languages. I just want to mention, that NULL in SQL world is a bit
> > different with the
> > meaning in programming languages like java.
> >
> > NULL in SQL essentially means "UNKNOWN", it's not as scary as a null in
> > java which
> > will easily cause a NPE or some random behavior with a c++ function call.
> > UNKNOWN
> > means it could be any value. In java, the condition "null == null" always
> > return true. But
> > in SQL, it returns NULL, which means UNKNOWN.
> >
> > Another example, if you run following statements:
> > select 'true' where 3 in (1, 2, 3, null) // this will print true
> > select 'true' where 3 not in (1, 2, null) // this won't print anything
> >
> > In summary, SQL's NULL is a bit different from others, it has its own
> > meaning. So I won't
> > compare the behavior of returning NULL with programming languages and
> then
> > judge it
> > as bad behavior. And it's not a very big deal if we return NULL when
> trying
> > to cast "abc"
> > to an integer, which means we don't know the correct value.
> >
> > But still, I'm ok to change the behavior, but just not now. It will
> really
> > create a big mess after
> > users upgrade their SQL jobs. I'm either fine to do it in some really big
> > version change like
> > Flink 2.0, or we can do it after we have some universal error records
> > handling mechanism, so
> > in that way, users could have a chance to handle such a situation.
> >
> > Best,
> > Kurt
> >
> >
> > On Fri, Nov 19, 2021 at 7:29 PM Francesco Guardiani <
> > france...@ververica.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > tl;dr:
> > >
> > > I think Timo pretty much said it all. As described in the issue, my
> > > proposal is:
> > >
> > > * 

Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-19 Thread Kurt Young
cation at all that the cast failed, and even if we
> push a change to log "hey this cast failed on this record" it would still
> be extremely complicated to track down how badly a single cast failure
> affected the results of a projection, a grouping, an aggregation, etc.
> Hence my definition of our CAST function as a footgun.
>
> The bottom line for me is that our CAST primitive goes directly against the
> goal of Flink SQL to provide a simple to use API for developers and
> business people to develop computation pipelines, because it's not
> explicit, it silently fail with NULLs, and we require users to deal with
> it.
>
> The very same discussion applies with TO_TIMESTAMP, which among the others
> might even be more crucial because we directly use it in our documentation
> to tell users how to compute rowtime.
>
> FG
>
> [1] Note: here the naming is a fundamental part of the issue, the function
> we have today is named CAST and not TRY_CAST or CAST_OR_NULL or any other
> name giving the indication that the operation might fail and provide a
> result different from the cast result.
>
>
> On Fri, Nov 19, 2021 at 4:00 AM Kurt Young  wrote:
>
> > Hi Timo,
> >
> > Regarding CAST, I think no one denies the standard behavior which should
> > raise errors when
> > failed. The only question is how do we solve it, given lots of users
> > already relying on current
> > more tolerant behavior. Some violation of standard but acceptable
> behavior
> > doesn't deserve
> > a breaking change in Flink minor version IMO, i'm more comfortable to fix
> > it in versions like
> > Flink 2.0.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Nov 18, 2021 at 11:44 PM Timo Walther 
> wrote:
> >
> > > Hi everyone,
> > >
> > >
> > > thanks for finally have this discussion on the mailing list. As both a
> > > contributor and user, I have experienced a couple issues around
> > > nullability coming out of nowhere in a pipeline. This discussion should
> > > not only cover CAST but failure handling in general.
> > >
> > > Let me summarize my opinion:
> > >
> > > 1) CAST vs. TRY_CAST
> > >
> > > CAST is a SQL standard core operation with well-defined semantics
> across
> > > all major SQL vendors. There should be no discussion whether it returns
> > > NULL or an error. The semantics are already defined externally. I don't
> > > agree with "Streaming computing is a resident program ... users do not
> > > want it to frequently fail", the same argument is also true for nightly
> > > batch jobs. A batch job can also get stuck through a SQL statement that
> > > is not lenient enough defined by the user.
> > >
> > > An option that restores the old behavior and TRY_CAST for the future
> > > should solve this use case and make all parties happy.
> > >
> > > 2) TO_TIMESTAMP / TO_DATE
> > >
> > > We should distinguish between CASTING and CONVERSION / PARSING. As a
> > > user, I would expect that parsing can fail and have to deal with this
> > > accordingly. Therefore, I'm fine with returning NULL in TO_ or CONVERT_
> > > functions. This is also consistent with other vendors. Take PARSE of
> SQL
> > > Server as an example [1]: "If a parameter with a null value is passed
> at
> > > run time, then a null is returned, to avoid canceling the whole
> batch.".
> > > Here we can be more flexible with the semantics because users need to
> > > read the docs anyway.
> > >
> > > 3) Null at other locations
> > >
> > > In general, we should stick to our data type constraints. Everything
> > > else will mess up the architecture of functions/connectors and their
> > > return types. Take the rowtime (event-time timestamp) attribute as an
> > > example: PRs like the one for FLINK-24885 are just the peak of the
> > > iceberg. If we would allow rowtime columns to be NULL we would need to
> > > check all time-based operators and implement additional handling logic
> > > for this.
> > >
> > > It would be better to define unified error-handling for operators and
> > > maybe drop rows if the per-element processing failed. We should have a
> > > unified approach how to log/side output such records.
> > >
> > > Until this is in place, I would suggest we spend some time in rules
> that
> > > can be enabled with an option for modifying the plan and wrap
> frequently
> > > failing

Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-18 Thread Kurt Young
Hi Timo,

Regarding CAST, I think no one denies the standard behavior which should
raise errors when
failed. The only question is how do we solve it, given lots of users
already relying on current
more tolerant behavior. Some violation of standard but acceptable behavior
doesn't deserve
a breaking change in Flink minor version IMO, i'm more comfortable to fix
it in versions like
Flink 2.0.

Best,
Kurt


On Thu, Nov 18, 2021 at 11:44 PM Timo Walther  wrote:

> Hi everyone,
>
>
> thanks for finally have this discussion on the mailing list. As both a
> contributor and user, I have experienced a couple issues around
> nullability coming out of nowhere in a pipeline. This discussion should
> not only cover CAST but failure handling in general.
>
> Let me summarize my opinion:
>
> 1) CAST vs. TRY_CAST
>
> CAST is a SQL standard core operation with well-defined semantics across
> all major SQL vendors. There should be no discussion whether it returns
> NULL or an error. The semantics are already defined externally. I don't
> agree with "Streaming computing is a resident program ... users do not
> want it to frequently fail", the same argument is also true for nightly
> batch jobs. A batch job can also get stuck through a SQL statement that
> is not lenient enough defined by the user.
>
> An option that restores the old behavior and TRY_CAST for the future
> should solve this use case and make all parties happy.
>
> 2) TO_TIMESTAMP / TO_DATE
>
> We should distinguish between CASTING and CONVERSION / PARSING. As a
> user, I would expect that parsing can fail and have to deal with this
> accordingly. Therefore, I'm fine with returning NULL in TO_ or CONVERT_
> functions. This is also consistent with other vendors. Take PARSE of SQL
> Server as an example [1]: "If a parameter with a null value is passed at
> run time, then a null is returned, to avoid canceling the whole batch.".
> Here we can be more flexible with the semantics because users need to
> read the docs anyway.
>
> 3) Null at other locations
>
> In general, we should stick to our data type constraints. Everything
> else will mess up the architecture of functions/connectors and their
> return types. Take the rowtime (event-time timestamp) attribute as an
> example: PRs like the one for FLINK-24885 are just the peak of the
> iceberg. If we would allow rowtime columns to be NULL we would need to
> check all time-based operators and implement additional handling logic
> for this.
>
> It would be better to define unified error-handling for operators and
> maybe drop rows if the per-element processing failed. We should have a
> unified approach how to log/side output such records.
>
> Until this is in place, I would suggest we spend some time in rules that
> can be enabled with an option for modifying the plan and wrap frequently
> failing expressions with a generic TRY() function. In this case, we
> don't need to deal with NULL in all built-in functions, we can throw
> helpful errors during development, and can return NULL even though the
> return type is NOT NULL. It would also make the NULL returning explicit
> in the plan.
>
> Regards,
> Timo
>
>
>
>
>
> [1]
>
> https://docs.microsoft.com/en-us/sql/t-sql/functions/parse-transact-sql?view=sql-server-ver15
> [2] https://issues.apache.org/jira/browse/FLINK-24885
>
>
>
>
>
> On 18.11.21 11:34, Kurt Young wrote:
> > Sorry I forgot to add user ML. I also would like to gather some users
> > feedback on this thing.
> > Since I didn't get any feedback on this topic before from users.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Nov 18, 2021 at 6:33 PM Kurt Young  wrote:
> >
> >> (added user ML to this thread)
> >>
> >> HI all,
> >>
> >> I would like to raise a different opinion about this change. I agree
> with
> >> Ingo that
> >> we should not just break some existing behavior, and even if we
> introduce
> >> an
> >> option to control the behavior, i would propose to set the default value
> >> to current
> >> behavior.
> >>
> >> I want to mention one angle to assess whether we should change it or
> not,
> >> which
> >> is "what could users benefit from the changes". To me, it looks like:
> >>
> >> * new users: happy about the behavior
> >> * existing users: suffer from the change, it either cause them to modify
> >> the SQL or
> >> got a call in late night reporting his online job got crashed and
> couldn't
> >> be able to
> >> restart.
> >>
> >> I would like to quote another breaking cha

Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-18 Thread Kurt Young
Sorry I forgot to add user ML. I also would like to gather some users
feedback on this thing.
Since I didn't get any feedback on this topic before from users.

Best,
Kurt


On Thu, Nov 18, 2021 at 6:33 PM Kurt Young  wrote:

> (added user ML to this thread)
>
> HI all,
>
> I would like to raise a different opinion about this change. I agree with
> Ingo that
> we should not just break some existing behavior, and even if we introduce
> an
> option to control the behavior, i would propose to set the default value
> to current
> behavior.
>
> I want to mention one angle to assess whether we should change it or not,
> which
> is "what could users benefit from the changes". To me, it looks like:
>
> * new users: happy about the behavior
> * existing users: suffer from the change, it either cause them to modify
> the SQL or
> got a call in late night reporting his online job got crashed and couldn't
> be able to
> restart.
>
> I would like to quote another breaking change we did when we adjust the
> time-related
> function in FLIP-162 [1]. In that case, both new users and existing users
> are suffered
> from *incorrectly* implemented time function behavior, and we saw a lots
> of feedbacks and
> complains from various channels. After we fixed that, we never saw related
> problems again.
>
> Back to this topic, do we ever seen a user complain about current CAST
> behavior? Form my
> side, no.
>
> To summarize:
>
> +1 to introduce TRY_CAST to better prepare for the future.
> -1 to modify the default behavior.
> +0 to introduce a config option, but with the default value to existing
> behavior. it's +0 because it
> seems not necessary if i'm -1 to change the default behavior and also
> don't see an urgent to modify.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior
>
> Best,
> Kurt
>
>
> On Thu, Nov 18, 2021 at 4:26 PM Ingo Bürk  wrote:
>
>> Hi,
>>
>> first of all, thanks for the summary of both sides, and for bringing up
>> the
>> discussion on this.
>> I think it is obvious that this is not something we can just "break", so
>> the config option seems mandatory to me.
>>
>> Overall I agree with Martijn and Till that throwing errors is the more
>> expected behavior. I mostly think this is valuable default behavior
>> because
>> it allows developers to find mistakes early and diagnose them much easier
>> compare to having to "work backwards" and figure out that it is the CAST
>> that failed. It also means that pipelines using TRY_CAST are
>> self-documenting because using that can signal "we might receive broken
>> data here".
>>
>>
>> Best
>> Ingo
>>
>> On Thu, Nov 18, 2021 at 9:11 AM Till Rohrmann 
>> wrote:
>>
>> > Hi everyone,
>> >
>> > personally I would also prefer the system telling me that something is
>> > wrong instead of silently ignoring records. If there is a TRY_CAST
>> function
>> > that has the old behaviour, people can still get the old behaviour. For
>> > backwards compatibility reasons it is a good idea to introduce a switch
>> to
>> > get back the old behaviour. By default we could set it to the new
>> > behaviour, though. Of course, we should explicitly document this new
>> > behaviour so that people are aware of it before running their jobs for
>> days
>> > and then encountering an invalid input.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Thu, Nov 18, 2021 at 9:02 AM Martijn Visser 
>> > wrote:
>> >
>> > > Hi Caizhi,
>> > >
>> > > Thanks for bringing this up for discussion. I think the important
>> part is
>> > > what do developers expect as the default behaviour of a CAST function
>> > when
>> > > casting fails. If I look at Postgres [1] or MSSQL [2], the default
>> > > behaviour of a CAST failing would be to return an error, which would
>> be
>> > the
>> > > new behaviour. Returning a value when a CAST fails can lead to users
>> not
>> > > understanding immediately where that value comes from. So, I would be
>> in
>> > > favor of the new behaviour by default, but including a configuration
>> flag
>> > > to maintain the old behaviour to avoid that you need to rewrite all
>> these
>> > > jobs.
>> > >
>> > > Best regards,
>> > >
>> > > Martijn
>> > >
>> > > [1] htt

Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-18 Thread Kurt Young
(added user ML to this thread)

HI all,

I would like to raise a different opinion about this change. I agree with
Ingo that
we should not just break some existing behavior, and even if we introduce an
option to control the behavior, i would propose to set the default value to
current
behavior.

I want to mention one angle to assess whether we should change it or not,
which
is "what could users benefit from the changes". To me, it looks like:

* new users: happy about the behavior
* existing users: suffer from the change, it either cause them to modify
the SQL or
got a call in late night reporting his online job got crashed and couldn't
be able to
restart.

I would like to quote another breaking change we did when we adjust the
time-related
function in FLIP-162 [1]. In that case, both new users and existing users
are suffered
from *incorrectly* implemented time function behavior, and we saw a lots of
feedbacks and
complains from various channels. After we fixed that, we never saw related
problems again.

Back to this topic, do we ever seen a user complain about current CAST
behavior? Form my
side, no.

To summarize:

+1 to introduce TRY_CAST to better prepare for the future.
-1 to modify the default behavior.
+0 to introduce a config option, but with the default value to existing
behavior. it's +0 because it
seems not necessary if i'm -1 to change the default behavior and also don't
see an urgent to modify.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior

Best,
Kurt


On Thu, Nov 18, 2021 at 4:26 PM Ingo Bürk  wrote:

> Hi,
>
> first of all, thanks for the summary of both sides, and for bringing up the
> discussion on this.
> I think it is obvious that this is not something we can just "break", so
> the config option seems mandatory to me.
>
> Overall I agree with Martijn and Till that throwing errors is the more
> expected behavior. I mostly think this is valuable default behavior because
> it allows developers to find mistakes early and diagnose them much easier
> compare to having to "work backwards" and figure out that it is the CAST
> that failed. It also means that pipelines using TRY_CAST are
> self-documenting because using that can signal "we might receive broken
> data here".
>
>
> Best
> Ingo
>
> On Thu, Nov 18, 2021 at 9:11 AM Till Rohrmann 
> wrote:
>
> > Hi everyone,
> >
> > personally I would also prefer the system telling me that something is
> > wrong instead of silently ignoring records. If there is a TRY_CAST
> function
> > that has the old behaviour, people can still get the old behaviour. For
> > backwards compatibility reasons it is a good idea to introduce a switch
> to
> > get back the old behaviour. By default we could set it to the new
> > behaviour, though. Of course, we should explicitly document this new
> > behaviour so that people are aware of it before running their jobs for
> days
> > and then encountering an invalid input.
> >
> > Cheers,
> > Till
> >
> > On Thu, Nov 18, 2021 at 9:02 AM Martijn Visser 
> > wrote:
> >
> > > Hi Caizhi,
> > >
> > > Thanks for bringing this up for discussion. I think the important part
> is
> > > what do developers expect as the default behaviour of a CAST function
> > when
> > > casting fails. If I look at Postgres [1] or MSSQL [2], the default
> > > behaviour of a CAST failing would be to return an error, which would be
> > the
> > > new behaviour. Returning a value when a CAST fails can lead to users
> not
> > > understanding immediately where that value comes from. So, I would be
> in
> > > favor of the new behaviour by default, but including a configuration
> flag
> > > to maintain the old behaviour to avoid that you need to rewrite all
> these
> > > jobs.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > [1] https://www.postgresql.org/docs/current/sql-createcast.html
> > > [2]
> > >
> > >
> >
> https://docs.microsoft.com/en-us/sql/t-sql/functions/try-cast-transact-sql?view=sql-server-ver15
> > >
> > > On Thu, 18 Nov 2021 at 03:17, Caizhi Weng 
> wrote:
> > >
> > > > Hi devs!
> > > >
> > > > We're discussing the behavior of casting functions (including cast,
> > > > to_timestamp, to_date, etc.) for invalid input in
> > > > https://issues.apache.org/jira/browse/FLINK-24924. As this topic is
> > > > crucial
> > > > to compatibility and usability we'd like to continue discussing this
> > > > publicly in the mailing list.
> > > >
> > > > The main topic is to discuss that shall casting functions return null
> > > (keep
> > > > its current behavior) or throw exceptions (introduce a new behavior).
> > I'm
> > > > trying to conclude the ideas on both sides. Correct me if I miss
> > > something.
> > > >
> > > > *From the devs who support throwing exceptions (new behavior):*
> > > >
> > > > The main concern is that if we silently return null then unexpected
> > > results
> > > > or exceptions (mainly NullPointerException) may be produced. However,
> > it
> > > > will be hard 

Re: [ANNOUNCE] New Apache Flink Committer - Jing Zhang

2021-11-15 Thread Kurt Young
Congrats Jing!

Best,
Kurt

On Tue, Nov 16, 2021 at 9:52 AM Xintong Song  wrote:

> Congratulations~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 16, 2021 at 9:50 AM Dian Fu  wrote:
>
> > Congratulations!
> >
> > Regards,
> > Dian
> >
> > On Tue, Nov 16, 2021 at 9:48 AM godfrey he  wrote:
> >
> > > Congrats & well deserved, Jing!
> > >
> > > Best,
> > > Godfrey
> > >
> > > Leonard Xu  于2021年11月16日周二 上午12:11写道:
> > > >
> > > > Congratulations Jing! Well Deserved.
> > > >
> > > >
> > > > > 在 2021年11月15日,22:30,Dawid Wysakowicz  写道:
> > > > >
> > > > > Congrats!
> > > > >
> > > > > On 15/11/2021 14:39, Timo Walther wrote:
> > > > >> Hi everyone,
> > > > >>
> > > > >> On behalf of the PMC, I'm very happy to announce Jing Zhang as a
> new
> > > > >> Flink committer.
> > > > >>
> > > > >> Jing has been very active in the Flink community esp. in the
> > Table/SQL
> > > > >> area for quite some time: 81 PRs [1] in total and is also active
> on
> > > > >> answering questions on the user mailing list. She is currently
> > > > >> contributing a lot around the new windowing table-valued functions
> > > [2].
> > > > >>
> > > > >> Please join me in congratulating Jing Zhang for becoming a Flink
> > > > >> committer!
> > > > >>
> > > > >> Thanks,
> > > > >> Timo
> > > > >>
> > > > >> [1] https://github.com/apache/flink/pulls/beyond1920
> > > > >> [2] https://issues.apache.org/jira/browse/FLINK-23997
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-09 Thread Kurt Young
+1 (binding)

Best,
Kurt


On Wed, Nov 10, 2021 at 10:52 AM Jingsong Li  wrote:

> Hi everyone,
>
> Thanks for all the feedback so far. Based on the discussion[1] we seem
> to have consensus, so I would like to start a vote on FLIP-188 for
> which the FLIP has now also been updated[2].
>
> The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
> there is an objection or insufficient votes.
>
> [1] https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
>
> Best,
> Jingsong
>


Re: [DISCUSS] FLIP-189: SQL Client Usability Improvements

2021-11-01 Thread Kurt Young
Really cool improvements @Sergey. Can't wait to see it happen.

Best,
Kurt


On Tue, Nov 2, 2021 at 1:56 AM Martijn Visser  wrote:

> Hi Sergey,
>
> I guess you've just set a new standard ;-) I agree with Ingo, these
> improvements look really good!
>
> Best regards,
>
> Martijn
>
> On Mon, 1 Nov 2021 at 18:23, Ingo Bürk  wrote:
>
> > Hi Sergey,
> >
> > I think those improvements look absolutely amazing. Thanks for the little
> > video!
> >
> >
> > Best
> > Ingo
> >
> > On Mon, Nov 1, 2021, 17:15 Sergey Nuyanzin  wrote:
> >
> > > Thanks for the feedback Till.
> > >
> > > Martijn, I have created a short demo showing some of the features
> > mentioned
> > > in FLIP.
> > > It is available at https://asciinema.org/a/446247?speed=3.0
> > > Could you please tell if it is what you are expecting or not?
> > >
> > > On Fri, Oct 29, 2021 at 4:59 PM Till Rohrmann 
> > > wrote:
> > >
> > > > Thanks for creating this FLIP Sergey. I think what you propose sounds
> > > like
> > > > very good improvements for the SQL client. This should make the
> client
> > a
> > > > lot more ergonomic :-)
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Oct 29, 2021 at 11:26 AM Sergey Nuyanzin <
> snuyan...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Martijn,
> > > > >
> > > > > Thank you for your suggestion with POC.
> > > > > Yes I will do that and come back to this thread probably after the
> > > > weekend
> > > > >
> > > > > On Thu, Oct 28, 2021 at 4:38 PM Martijn Visser <
> > mart...@ververica.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Sergey,
> > > > > >
> > > > > > Thanks for taking the initiative to create a FLIP and propose
> > > > > improvements
> > > > > > on the SQL client. All usability improvements on the SQL client
> are
> > > > > highly
> > > > > > appreciated, especially for new users of Flink. Multi-line
> support
> > is
> > > > > > definitely one of those things I've run into myself.
> > > > > >
> > > > > > I do think it would be quite nice if there would be some kind of
> > POC
> > > > > which
> > > > > > could show (some of) the proposed improvements. Is that something
> > > that
> > > > > > might be easily feasible?
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Martijn
> > > > > >
> > > > > > On Thu, 28 Oct 2021 at 11:02, Sergey Nuyanzin <
> snuyan...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I want to start a discussion about FLIP-189: SQL Client
> Usability
> > > > > > > Improvements.
> > > > > > >
> > > > > > > The main changes in this FLIP:
> > > > > > >
> > > > > > > - Flink sql client parser improvements so
> > > > > > >that sql client does not ask for ; inside a quoted string
> or a
> > > > > comment
> > > > > > > - use prompt to show what sql client is waiting for
> > > > > > > - introduce syntax highlighting
> > > > > > > - improve completion
> > > > > > >
> > > > > > > For more detailed changes, please refer to FLIP-189[1].
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-189%3A+SQL+Client+Usability+Improvements
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Look forward to your feedback.
> > > > > > >
> > > > > > > --
> > > > > > > Best regards,
> > > > > > > Sergey
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best regards,
> > > > > Sergey
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best regards,
> > > Sergey
> > >
> >
>


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-31 Thread Kurt Young
Hi Till,

We have discussed the possibility of putting this FLIP into another
repository offline
with Stephan and Timo. This looks similar with another under going effort
which trying
to put all connectors outside the Flink core repository.

>From the motivation and scope of this FLIP, it's quite different from
current connectors in
some aspects. What we are trying to offer is some kind of built-in storage,
or we can call it
internal/managed tables, compared with current connectors, they kind of
express external
tables of Flink SQL. Functionality-wise, this managed table would have more
ability than
all these connectors, since we controlled the implementation of such
storage. Thus this table
storage will interact with lots SQL components, like metadata handling,
optimization, execution,
etc.

However we do see some potential benefits if we choose to put it outside
Flink:
- We may achieve more rapid development speed and maybe more frequent
release.
- Force us to think really clearly about the interfaces it should be,
because we don't have
the shortcut to modify core & connector codes all at the same time.

But we also can't ignore the overhead:
- We almost need everything that is discussed in the splitting connectors
thread.
- We have to create lots more interface than TableSource/TableSink to make
it just work at the first
place, e.g. interfaces to express such tables should be managed by Flink,
interfaces to express the
physical capability of the storage then it can be bridged to SQL optimizer
and executor.
- If we create lots of interfaces with only one implementation, that sounds
overengineering to me.

Combining the pros and cons above, what we are trying to do is firstly
implement it in a feature branch,
and also keep good engineering and design in mind. At some point we
re-evaluate the decision whether
to put it inside or outside the Flink core. What do you think?

Best,
Kurt


On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann  wrote:

> Hi Jingsong,
>
> Thanks for creating this FLIP. I don't have a lot to add because I am not
> very familiar with the SQL components. While reading the FLIP I was
> wondering what would we need in Flink to build something like the BDT
> feature outside of Flink as a kind of extension? Would something like this
> be possible? Maybe the answer is a quick no ;-)
>
> Cheers,
> Till
>
> On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > I updated FLIP based on your feedback:
> >
> > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> > TableDescriptor.forManaged
> >
> > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
> >
> > 3. Add description to miss dropped table.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li 
> > wrote:
> > >
> > > Hi Ingo,
> > >
> > > Really appreciate your feedback.
> > >
> > > #1. The reason why we insist on using no "connector" option is that we
> > > want to bring the following design to users:
> > > - With the "connector" option, it is a mapping, unmanaged table.
> > > - Without the "connector" option, it is a managed table. It may be an
> > > Iceberg managed table, or may be a JDBC managed table, or may be a
> > > Flink managed table.
> > >
> > > #2. About:
> > > CREATE TABLE T (f0 INT);
> > > ALTER TABLE T SET ('connector' = '…');
> > >
> > > I think it is dangerous, even for a generic table. The managed table
> > > should prohibit it.
> > >
> > > #3. DDL and Table API
> > >
> > > You are right, Table Api should be a superset of SQL. There is no
> > > doubt that it should support BDT.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk  wrote:
> > > >
> > > > Hi Jingsong,
> > > >
> > > > thanks again for the answers. I think requiring catalogs to implement
> > an
> > > > interface to support BDTs is something we'll need (though personally
> I
> > > > still prefer explicit DDL here over the "no connector option"
> > approach).
> > > >
> > > > What about more edge cases like
> > > >
> > > > CREATE TABLE T (f0 INT);
> > > > ALTER TABLE T SET ('connector' = '…');
> > > >
> > > > This would have to first create the physical storage and then delete
> it
> > > > again, right?
> > > >
> > > > On a separate note, he FLIP currently only discusses SQL DDL, and you
> > have
> > > > also mentioned
> > > >
> > > > > BDT only can be dropped by Flink SQL DDL now.
> > > >
> > > > Something Flink suffers from a lot is inconsistencies across APIs. I
> > think
> > > > it is important that we support features on all major APIs, i.e.
> > including
> > > > Table API.
> > > > For example for creating a BDT this would mean e.g. adding something
> > like
> > > > #forManaged(…) to TableDescriptor.
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li 
> > wrote:
> > > >
> > > > > Hi Ingo,
> > > > >
> > > > > I thought again.
> > > > >
> > > > > I'll try to sort out the current catalog behaviors.
> > > 

Re: [VOTE] Release 1.14.0, release candidate #1

2021-09-15 Thread Kurt Young
I noticed that a serious performance degradation has been reported [1],
shall we wait
for the conclusion for that issue?

[1] https://issues.apache.org/jira/browse/FLINK-24300

Best,
Kurt


On Wed, Sep 15, 2021 at 3:29 PM Dawid Wysakowicz 
wrote:

> Hi everyone,
> Please review and vote on the release candidate #1 for the version 1.14.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint 31D2DD10BFC15A2D [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.14.0-rc1" [5],
> * website pull request listing the new release and adding announcement
> blog post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Xintong, Joe, Dawid
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.14.0-rc1
> 
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1449
> [5] https://github.com/apache/flink/tree/release-1.14.0-rc1
> [6] https://github.com/apache/flink-web/pull/466
>


Re: One idea for Graceful Application Evolvement

2021-09-02 Thread Kurt Young
 Could you explain why you need a backfill after you take v2 into
production?

Best,
Kurt


On Fri, Sep 3, 2021 at 2:02 AM zhihao wang  wrote:

> Hi team
>
> Graceful Application Evolvement is a hard and open problem to the
> community. We met this problem in our production, too. To address it, we
> are planning to leverage a backfill-based approach with a self-build
> management system. We'd like to learn the community feedback on this
> approach.
>
> Our job structure is like this: Kafka Inputs -> Flink SQL v1 -> Kafka
> Output. We need to keep the Kafka Output interface / address unchanged to
> clients. We perform a code change in three steps:
>
> 1. *Develop Step:* The client launches a new Flink job to update the code
> from Flink SQL v1 to Flink SQL v2 with structure:  Kafka Inputs -> Flink
> SQL v2 -> TEMP Kafka. The new job will read the production inputs and write
> into an auto-generated temporary Kafka topic so that there is no pollution
> to the Flink SQL v1 job.
>
> 2. *Deploy Step*: When the client has tested thoroughly and thinks Flink
> SQL v2 is ready to be promoted to production (the completeness is judged
> manually by clients), the client can deploy Flink SQL v2 logic to
> production in one click. Behind the scene, the system will automatically do
> the following actions in sequence:
> 2.1. The system will take a savepoint of Flink SQL v2 job which
> contains all its internal states.
> 2.2. The system will stop the Flink SQL v2 job and Flink SQL v1 job.
> 2.3. The system will create a new production ready job with structure
>  Kafka Inputs -> Flink SQL v2 -> Kafka output from 2.1's savepoint.
>
> 3. *Backfill Step*: After Deploy Step is done, the Flink SQL v2 is already
> in production and serves the latest traffic. It’s at the client’s
> discretion on when and how fast to perform a backfill to correct all the
> records.
>
> 3.1. Here we need a special form of backfill: For the Flink job, given
> one key in the schema of , the backfill will 1) send a
> Refresh Record e.g. UPSERT  to clients if the key exists
> in Flink states. 2) send a Delete Record e.g. DELETE to clients
> if the key doesn't exist in Flink states.
> 3.2. The system will backfill all the records of two sinks in Deploy
> Step  and . The backfill will either refresh
> client records’ states or clean up clients’ stale records.
> 3.3. After the backfill is completed, the  will be
> destroyed automatically by the system.
>
> Please let us know your opinions on this approach.
>
> Regards
> Zhihao
>


[ANNOUNCE] New PMC member: Guowei Ma

2021-07-06 Thread Kurt Young
Hi all!

I'm very happy to announce that Guowei Ma has joined the Flink PMC!

Congratulations and welcome Guowei!

Best,
Kurt


Re: [ANNOUNCE] Criteria for merging pull requests is updated

2021-07-02 Thread Kurt Young
It seems disabling the merge button was only proposed during the release
testing phase, which IMO doesn't
mean we can't use it forever.

Best,
Kurt


On Fri, Jul 2, 2021 at 3:01 PM Xintong Song  wrote:

> It was part of the draft proposed in this mail [1]. And before that, it was
> brought up several times in both ML discussions [2][3] and IIRC offline
> release syncs.
>
> If that is too implicit and there are objections, I'm open to keeping on
> that discussion.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://lists.apache.org/thread.html/r09c4b8a03bc431adb5d7eaa17cb8e849f16da7a802b20798f32235cc%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/r76e1cdba577c6f4d6c86b23fdaeb53c4e3744c20d0b3e850fc2e14a7%40%3Cdev.flink.apache.org%3E
> [3]
>
> https://lists.apache.org/thread.html/r25ed92303cdefe41cdcc2935c2b06040b1bc7590ded01a26506a1e49%40%3Cdev.flink.apache.org%3E
>
> On Fri, Jul 2, 2021 at 2:19 PM Chesnay Schepler 
> wrote:
>
> > > - SHOULD NOT use the GitHub UI to merge PRs
> >
> > Where was this discussed?
> >
> >
> > On 7/2/2021 6:59 AM, Xintong Song wrote:
> > > Hi Flink committers,
> > >
> > > As previously discussed [1], the criteria for merging pull requests has
> > > been updated.
> > >
> > > A full version of guidelines can be found on the project wiki [2]. The
> > > following are some of the highlights.
> > > - MUST make sure passing the CI tests before merging PRs
> > > - SHOULD NOT use the GitHub UI to merge PRs
> > > - For frequent test instabilities that are temporarily disabled, the
> > > corresponding JIRA tickets must be made BLOCKER
> > >
> > > I'd like to kindly ask all Flink committers to please read through the
> > new
> > > guidelines and merge PRs accordingly.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> >
> https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E
> > >
> > > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> > >
> >
> >
>


Re: [DISCUSS] Feedback Collection Jira Bot

2021-06-30 Thread Kurt Young
+1 to Stephan's opinion, with just one minor difference. For my experience
and a project
as big as Flink, picking up an issue created 1-2 years ago seems normal to
me. To be more
specific, during the blink planner merge, I created lots of clean up &
refactor issues, trying
to make the code be more clean. I haven't had a chance to resolve all these
but I think they are
still good improvements. Thus I would propose we don't close any stall
issues for at least 2 years.

Best,
Kurt


On Wed, Jun 30, 2021 at 7:22 AM Stephan Ewen  wrote:

> Being a bit late to the party, and don't want to ask to change everything,
> just maybe some observation.
>
> My main observation and concern is still that this puts pressure and
> confusion on contributors, which are mostly blocked on committers for
> reviews, or are taking tickets as multi-week projects. I think it is not a
> great experience for contributors, when they are already unsure why their
> work isn't getting the attention from committers that they hoped for, to
> then see issues unassigned or deprioritized automatically. I think we
> really need to weigh this discouragement of contributors against the desire
> for a tidy ticket system.
> I also think by now this isn't just a matter of phrasing the bot's message
> correctly. Auto unassignment and deprioritization sends a subtle message
> that jira resolution is a more important goal than paying attention to
> contributors (at least I think that is how it will be perceived by many).
>
> Back to the original motivation, to not have issues lying around forever,
> ensuring there is closure eventually.
> For that, even much longer intervals would be fine. Like pinging every
> three months, closing after three pings - would resolve most tickets in a
> year, which is not too bad in the scope of a project like Flink. Many
> features/wishes easily move to two releases in the future, which is almost
> a year. We would get rid of long dead tickets and interfere little with
> current tickets. Contributors can probably understand ticket closing after
> a year of inactivity.
>
> I am curious if a very simple bot that really just looks at stale issues
> (no comment/change in three months), pings the
> issue/reporter/assignee/watchers and closes it after three pings would do
> the job.
> We would get out of the un-assigning business (which can send very tricky
> signals) and would rely on reporters/assignees/watchers to unassign if they
> see that the contributor abandoned the issue. With a cadence of three
> months for pinging, this isn't much work for the ones that get pinged.
>
> Issues where we rely on faster handling are probably the ones where
> committers have a stake in getting those into an upcoming release, so these
> tend to be watched anyways.
>
> On Wed, Jun 23, 2021 at 2:39 PM JING ZHANG  wrote:
>
> > Hi Konstantin, Chesnay,
> >
> > > I would like it to not unassign people if a PR is open. These are
> > > usually blocked by the reviewer, not the assignee, and having the
> > > assignees now additionally having to update JIRA periodically is a bit
> > > like rubbing salt into the wound.
> >
> > I agree with Chesnay about not un-assign an issue if a PR is open.
> > Besides, Could assignees remove the "stale-assigned" tag  by themself? It
> > seems assignees have no permission to delete the tag if the issue is not
> > created by themselves.
> >
> > Best regards,
> > JING ZHANG
> >
> > Konstantin Knauf  于2021年6月23日周三 下午4:17写道:
> >
> > > > I agree there are such tickets, but I don't see how this is
> addressing
> > my
> > > concerns. There are also tickets that just shouldn't be closed as I
> > > described above. Why do you think that duplicating tickets and losing
> > > discussions/knowledge is a good solution?
> > >
> > > I don't understand why we are necessarily losing discussion/knowledge.
> > The
> > > tickets are still there, just in "Closed" state, which are included in
> > > default Jira search. We could of course just add a label, but closing
> > seems
> > > clearer to me given that likely this ticket will not get comitter
> > attention
> > > in the foreseeable future.
> > >
> > > > I would like to avoid having to constantly fight against the bot.
> It's
> > > already responsible for the majority of my daily emails, with quite
> > little
> > > benefit for me personally. I initially thought that after some period
> of
> > > time it will settle down, but now I'm afraid it won't happen.
> > >
> > > Can you elaborate which rules you are running into mostly? I'd rather
> > like
> > > to understand how we work right now and where this conflicts with the
> > Jira
> > > bot vs slowly disabling the jira bot via labels.
> > >
> > > On Wed, Jun 23, 2021 at 10:00 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Konstantin,
> > > >
> > > > > In my opinion it is important that we close tickets eventually.
> There
> > > are
> > > > a
> > > > > lot of tickets (bugs, improvements, tech debt) that over time
> became
> > > > > 

Re: [Discuss] Planning Flink 1.14

2021-06-02 Thread Kurt Young
Thanks for bringing this up.

I have one thought about the release period. In a short word: shall we try
to extend the release period for 1 month?

There are a couple of reasons why I want to bring up this proposal.

1) I observed that lots of users are actually far behind the current Flink
version. For example, we are now actively
developing 1.14 but most users I know who have a migration or upgrade plan
are planning to upgrade to 1.12. This means
we need to back port bug fixes to 1.12 and 1.13. If we extend the release
period by 1 month, I think there may be some
chances that users can have a proper time frame to upgrade to the previous
released version. Then we can have a
good development cycle which looks like "actively developing the current
version and making the previous version stable,
not 2 ~ 3 versions before". Always far away from Flink's latest version
also suppresses the motivation to contribute to Flink
from users perspective.

2) Increasing the release period also eases the workload of committers
which I think can improve the contributor experience.
I have seen several times that when some contributors want to do some new
features or improvements, we have to response
with "sorry we are right now focusing with implementing/stabilizing planned
feature for this version", and the contributions are
mostly like being stalled and never brought up again.

BTW extending the release period also has downsides. It slows down the
delivery speed of new features. And I'm also not
sure how much it can improve the above 2 issues.

Looking forward to hearing some feedback from the community, both users and
developers.

Best,
Kurt


On Wed, Jun 2, 2021 at 8:39 PM JING ZHANG  wrote:

> Hi Dawid, Joe & Xintong,
>
> Thanks for starting the discussion.
>
> I would like to polish Window TVFs[1][2] which is a popular feature in SQL
> introduced in 1.13.
>
> The detailed items are as follows.
> 1. Add more computations based on Window TVF
> * Window Join (which is already merged in master branch)
> * Window Table Function
> * Window Deduplicate
> 2. Finish related JIRA to improve user experience
>* Add offset support for TUMBLE, HOP, session window
> 3. Complement the missing functions compared to the group window, which is
> a precondition of deprecating the legacy Grouped Window Function in the
> later versions.
>* Support Session windows
>* Support allow-lateness
>* Support retract input stream
>* Support window TVF in batch mode
>
> [1] https://issues.apache.org/jira/browse/FLINK-19604
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows
>
> Best regards,
> JING ZHANG
>
> Xintong Song  于2021年6月2日周三 下午6:45写道:
>
> > Hi all,
> >
> > As 1.13 has been released for a while, I think it is a good time to start
> > planning for the 1.14 release cycle.
> >
> > - Release managers: This time we'd like to have a team of 3 release
> > managers. Dawid, Joe and I would like to volunteer for it. What do you
> > think about it?
> >
> > - Timeline: According to our approximate 4 months release period, we
> > propose to aim for a feature freeze roughly in early August (which could
> > mean something like early September for the 1.14. release). Does it work
> > for everyone?
> >
> > - Collecting features: It would be helpful to have a rough overview of
> the
> > new features that will likely be included in this release. We have
> created
> > a wiki page [1] for collecting such information. We'd like to kindly ask
> > all committers to fill in the page with features that they intend to work
> > on.
> >
> > We would also like to emphasize some aspects of the engineering process:
> >
> > - Stability of master: This has been an issue during the 1.13 feature
> > freeze phase and it is still going on. We encourage every committer to
> not
> > merge PRs through the Github button, but do this manually, with caution
> for
> > the commits merged after the CI being triggered. It would be appreciated
> to
> > always build the project before merging to master.
> >
> > - Documentation: Please try to see documentation as an integrated part of
> > the engineering process and don't push it to the feature freeze phase or
> > even after. You might even think about going documentation first. We, as
> > the Flink community, are adding great stuff, that is pushing the limits
> of
> > streaming data processors, with every release. We should also make this
> > stuff usable for our users by documenting it well.
> >
> > - Promotion of 1.14: What applies to documentation also applies to all
> the
> > activity around the release. We encourage every contributor to also think
> > about, plan and prepare activities like blog posts and talk, that will
> > promote and spread the release once it is done.
> >
> > Please let us know what you think.
> >
> > Thank you~
> > Dawid, Joe & Xintong
> >
> > [1] 

Re: [DISCUSS] SQL CTAS Syntax

2021-05-28 Thread Kurt Young
Hi Konstantin,

>From my understanding, this syntax has 2 major benefits:

1. Just like you said, it saves the effort to specify the schema,
especially when involving hundreds of fields.
2. When using CREATE TABLE xx AS TABLE yy, it gives us the possibility to
enable schema evolution, and it seems pretty natural to do so.

Best,
Kurt


On Fri, May 28, 2021 at 5:44 PM Konstantin Knauf  wrote:

> Hi everyone,
>
> quick question for my understanding: how is this different to
>
> CREATE TABLE IF NOT EXISTS my_table (
> ...
> ) WITH (
> ...
> );
> INSERT INTO my_table SELECT ...;
>
> ?
>
> Is it only about a) not having to specify the schema and b) a more
> condensed syntax?
>
> Cheers,
>
> Konstantin
>
> On Fri, May 28, 2021 at 11:30 AM Jark Wu  wrote:
>
> > Thanks Danny for starting the discussion of extending CTAS syntax.
> >
> > I think this is a very useful feature for data integration and ETL jobs
> (a
> > big use case of Flink).
> > Many users complain a lot that manually defining schemas for sources and
> > sinks is hard.
> > CTAS helps users to write ETL jobs without defining any schemas of
> sources
> > and sinks.
> > CTAS automatically creates physical tables in external systems, and
> > automatically
> > maps external tables to Flink tables with the help of catalogs (e.g.
> > PgCatalog, HiveCatalog).
> >
> > On the other hand, the schema of the SELECT query is fixed after compile
> > time.
> > CTAS TABLE extends the syntax which allows dynamic schema during runtime,
> > semantically it streaming copies the up-to-date structure and data (if
> run
> > in streaming mode).
> > So I think CTAS TABLE is a major step forward for data integration, it
> > defines a syntax
> > which allows the underlying streaming pipeline automatically migrate
> schema
> > evolution
> > (e.g. ADD COLUMN) from source tables to sink tables without stopping jobs
> > or updating SQLs.
> >
> > Therefore, I'm +1 for the feature.
> >
> > Best,
> > Jark
> >
> > On Fri, 28 May 2021 at 16:22, JING ZHANG  wrote:
> >
> > > Hi Danny,
> > >
> > > Thanks for starting this discussion.
> > >
> > >
> > >
> > > Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very
> useful
> > > features. IMO, it is clear to separate them into two parts in the
> > `syntax`
> > > character. 
> > >
> > >
> > >
> > > First, I have two related problems:
> > >
> > >
> > > 1. Would `create table` in CTAS trigger to create a physical table in
> > > external storage system?
> > >
> > > For example, now normal `create table` would only define a connecting
> > with
> > > an existed external Kafka topic instead of trigger to create a physical
> > > kafka topic in kafka cluster. Does this behavior still work for CTAS
> AND
> > > CREATE TABLE LIKE?
> > >
> > >
> > > 2. Would the data sync  of CTAS run continuously if select works on a
> > > unbounded source?
> > >
> > > Since sub select query may works on unbounded source in Flink, which is
> > > different with other system (postgres, spark, hive, mysql). Does data
> > sync
> > > continuously run or just sync the snapshot at the job submit?
> > >
> > >
> > >
> > > Besides, I have some minor problems which is mentioned in your email.
> > >
> > >
> > >
> > > > how to write data into existing table with history data declare [IF
> NOT
> > > EXISTS] keywords and we ignore the table creation but the pipeline
> still
> > > starts up
> > >
> > >
> > >
> > > Maybe we should check old schema and new schema. What would happen if
> > > schema of existed table is different with new schema?
> > >
> > >
> > >
> > > > How to match sub-database and sub-table ? Use regex style source
> table
> > > name
> > >
> > >
> > >
> > >1. What would happen if schema of matched tables different with each
> > > other?
> > >
> > >2. What orders to sync data of all matched table? Sync data from all
> > > matched tables one by one or at the same time?
> > >
> > >
> > >
> > > >  AS select_statement: copy source table data into target
> > >
> > >
> > >
> > > User could  explicitly specify the data type for each column in the
> CTAS,
> > > what happened when run the following example. The demo is from MySQL
> > > document,
> > https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html
> > > , the result is a bit unexpected, I wonder
> > >
> > > What the behavior would be in Flink.
> > >
> > >
> > > [image: image.png]
> > >
> > > Best,
> > > JING ZHANG
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: Re: [ANNOUNCE] New Apache Flink Committer - Rui Li

2021-04-22 Thread Kurt Young
Congratulations Rui!

Best,
Kurt


On Thu, Apr 22, 2021 at 2:10 PM Arvid Heise  wrote:

> Congrats!
>
> Best,
>
> Arvid
>
> On Thu, Apr 22, 2021 at 8:07 AM Xingbo Huang  wrote:
>
> > Congratulations Rui~!
> >
> > Best,
> > Xingbo
> >
> > Xintong Song  于2021年4月22日周四 下午1:58写道:
> >
> > > Congrats, Rui~!
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Apr 22, 2021 at 11:52 AM Yun Gao  >
> > > wrote:
> > >
> > > > Congratulations Rui!
> > > >
> > > > Best,
> > > >  Yun
> > > >
> > > >
> > > > --
> > > > Sender:Nicholas Jiang
> > > > Date:2021/04/22 11:26:05
> > > > Recipient:
> > > > Theme:Re: [ANNOUNCE] New Apache Flink Committer - Rui Li
> > > >
> > > > Congrats, Rui!
> > > >
> > > > Best,
> > > > Nicholas Jiang
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from:
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > >
> > >
> >
>


[jira] [Created] (FLINK-22278) Refactor sql client's DynamicResult

2021-04-14 Thread Kurt Young (Jira)
Kurt Young created FLINK-22278:
--

 Summary: Refactor sql client's DynamicResult
 Key: FLINK-22278
 URL: https://issues.apache.org/jira/browse/FLINK-22278
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: Kurt Young


We can simplify the design around sql client's Executor and DynamicResult, by 
reducing the responsibility of Executor when retrieving SELECT result.

Page related logic should be handled by different CliResultViews instead of 
Executor and different DynamicResults.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22277) Refactor sql client's DynamicResult

2021-04-14 Thread Kurt Young (Jira)
Kurt Young created FLINK-22277:
--

 Summary: Refactor sql client's DynamicResult 
 Key: FLINK-22277
 URL: https://issues.apache.org/jira/browse/FLINK-22277
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: Kurt Young


Right now, the sql client Executor has different result handling logic for 
different result display mode. 

Different result is handled by child classes of `DynamicResult`. 

This leads to introducing page related API to the Executor, such as 
`snapshotResult`  and `retrieveResultPage`, which I think is inappropriate and 
will make things complicated. 

It will be benefit to simplify the responsibility of Executor about retrieving 
results to simply streaming back the SELECT result, and move the logic of 
dealing with different display mode into CliResultView. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22178) Support ignore-first-line option in new csv format

2021-04-09 Thread Kurt Young (Jira)
Kurt Young created FLINK-22178:
--

 Summary: Support ignore-first-line option in new csv format
 Key: FLINK-22178
 URL: https://issues.apache.org/jira/browse/FLINK-22178
 Project: Flink
  Issue Type: New Feature
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.13.0
Reporter: Kurt Young






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Feature freeze date for 1.13

2021-04-07 Thread Kurt Young
Hi Yuval,

I think you are good to go, since there is no objection from PMC.

Best,
Kurt


On Wed, Apr 7, 2021 at 12:48 AM Yuval Itzchakov  wrote:

> Hi Guowei,
>
> Who should I speak to regarding this? I am at the final stages of the PR I
> believe (Shengkai is kindly helping me make things work) and I would like
> to push this into 1.13.
>
> On Fri, Apr 2, 2021 at 5:43 AM Guowei Ma  wrote:
>
>> Hi, Yuval
>>
>> Thanks for your contribution. I am not a SQL expert, but it seems to be
>> beneficial to users, and the amount of code is not much and only left is
>> the test. Therefore, I am open to this entry into rc1.
>> But according to the rules, you still have to see if there are other
>> PMC's objections within 48 hours.
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 10:33 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi All,
>>>
>>> I would really love to merge https://github.com/apache/flink/pull/15307
>>> prior to 1.13 release cutoff, it just needs some more tests which I can
>>> hopefully get to today / tomorrow morning.
>>>
>>> This is a critical fix as now predicate pushdown won't work for any
>>> stream which generates a watermark and wants to push down predicates.
>>>
>>> On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:
>>>
>>>> Thanks Dawid, I have merged FLINK-20320.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> @Kurt @Arvid I think it's fine to merge those two, as they are pretty
>>>>> much finished. We can wait for those two before creating the RC0.
>>>>>
>>>>> @Leonard Personally I'd be ok with 3 more days for that single PR. I
>>>>> find the request reasonable and I second that it's better to have a proper
>>>>> review rather than rush unfinished feature and try to fix it later.
>>>>> Moreover it got broader support. Unless somebody else objects, I think we
>>>>> can merge this PR later and include it in RC1.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>> On 01/04/2021 08:39, Arvid Heise wrote:
>>>>>
>>>>> Hi Dawid and Guowei,
>>>>>
>>>>> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We
>>>>> are pretty much just waiting for AZP to turn green, it's separate from
>>>>> other components, and it's a super useful feature for Flink users.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/15054
>>>>>
>>>>> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>>>>>
>>>>>> Hi Guowei and Dawid,
>>>>>>
>>>>>> I want to request the permission to merge this feature [1], it's a
>>>>>> useful improvement to sql client and won't affect
>>>>>> other components too much. We were plan to merge it yesterday but met
>>>>>> some tricky multi-process issue which
>>>>>> has a very high possibility hanging the tests. It took us a while to
>>>>>> find out the root cause and fix it.
>>>>>>
>>>>>> Since it's not too far away from feature freeze and RC0 also not
>>>>>> created yet, thus I would like to include this
>>>>>> in 1.13.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, community:
>>>>>>>
>>>>>>> Friendly reminder that today (3.31) is the last day of feature
>>>>>>> development. Under normal circumstances, you will not be able to submit 
>>>>>>> new
>>>>>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>>>>>> testing, welcome to help test together.
>>>>>>> After the test is relatively stable, we will cut the release-1.13
>>>>>>> branch.
>>>>>>>
>>>

Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Kurt Young
Thanks Dawid, I have merged FLINK-20320.

Best,
Kurt


On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> @Kurt @Arvid I think it's fine to merge those two, as they are pretty much
> finished. We can wait for those two before creating the RC0.
>
> @Leonard Personally I'd be ok with 3 more days for that single PR. I find
> the request reasonable and I second that it's better to have a proper
> review rather than rush unfinished feature and try to fix it later.
> Moreover it got broader support. Unless somebody else objects, I think we
> can merge this PR later and include it in RC1.
>
> Best,
>
> Dawid
> On 01/04/2021 08:39, Arvid Heise wrote:
>
> Hi Dawid and Guowei,
>
> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
> pretty much just waiting for AZP to turn green, it's separate from other
> components, and it's a super useful feature for Flink users.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink/pull/15054
>
> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>
>> Hi Guowei and Dawid,
>>
>> I want to request the permission to merge this feature [1], it's a useful
>> improvement to sql client and won't affect
>> other components too much. We were plan to merge it yesterday but met
>> some tricky multi-process issue which
>> has a very high possibility hanging the tests. It took us a while to find
>> out the root cause and fix it.
>>
>> Since it's not too far away from feature freeze and RC0 also not created
>> yet, thus I would like to include this
>> in 1.13.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:
>>
>>> Hi, community:
>>>
>>> Friendly reminder that today (3.31) is the last day of feature
>>> development. Under normal circumstances, you will not be able to submit new
>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>> testing, welcome to help test together.
>>> After the test is relatively stable, we will cut the release-1.13 branch.
>>>
>>> Best,
>>> Dawid & Guowei
>>>
>>>
>>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>>> wrote:
>>>
>>>> +1 for the 31st of March for the feature freeze.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>>>> wrote:
>>>>
>>>> > +1 for March 31st for the feature freeze.
>>>> >
>>>> >
>>>> >
>>>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>>>> dwysakow...@apache.org>
>>>> > wrote:
>>>> >
>>>> > > Thank you Thomas! I'll definitely check the issue you linked.
>>>> > >
>>>> > > Best,
>>>> > >
>>>> > > Dawid
>>>> > >
>>>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>>>> > > > Hi Dawid,
>>>> > > >
>>>> > > > Thanks for the heads up.
>>>> > > >
>>>> > > > Regarding the "Rebase and merge" button. I find that merge option
>>>> > useful,
>>>> > > > especially for small simple changes and for backports. The
>>>> following
>>>> > > should
>>>> > > > help to safeguard from the issue encountered previously:
>>>> > > > https://github.com/jazzband/pip-tools/issues/1085
>>>> > > >
>>>> > > > Thanks,
>>>> > > > Thomas
>>>> > > >
>>>> > > >
>>>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>>>> > dwysakow...@apache.org
>>>> > > >
>>>> > > > wrote:
>>>> > > >
>>>> > > >> Hi devs, users!
>>>> > > >>
>>>> > > >> 1. *Feature freeze date*
>>>> > > >>
>>>> > > >> We are approaching the end of March which we agreed would be the
>>>> time
>>>> > > for
>>>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>>>> seems
>>>> > > to
>>>> > > >> be a viable plan. I think it is a good time to agree on a
&g

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Kurt Young
Hi Guowei and Dawid,

I want to request the permission to merge this feature [1], it's a useful
improvement to sql client and won't affect
other components too much. We were plan to merge it yesterday but met some
tricky multi-process issue which
has a very high possibility hanging the tests. It took us a while to find
out the root cause and fix it.

Since it's not too far away from feature freeze and RC0 also not created
yet, thus I would like to include this
in 1.13.

[1] https://issues.apache.org/jira/browse/FLINK-20320

Best,
Kurt


On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:

> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able to submit new
> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
> testing, welcome to help test together.
> After the test is relatively stable, we will cut the release-1.13 branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
> wrote:
>
>> +1 for the 31st of March for the feature freeze.
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>> wrote:
>>
>> > +1 for March 31st for the feature freeze.
>> >
>> >
>> >
>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> > wrote:
>> >
>> > > Thank you Thomas! I'll definitely check the issue you linked.
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>> > > > Hi Dawid,
>> > > >
>> > > > Thanks for the heads up.
>> > > >
>> > > > Regarding the "Rebase and merge" button. I find that merge option
>> > useful,
>> > > > especially for small simple changes and for backports. The following
>> > > should
>> > > > help to safeguard from the issue encountered previously:
>> > > > https://github.com/jazzband/pip-tools/issues/1085
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > > >
>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>> > dwysakow...@apache.org
>> > > >
>> > > > wrote:
>> > > >
>> > > >> Hi devs, users!
>> > > >>
>> > > >> 1. *Feature freeze date*
>> > > >>
>> > > >> We are approaching the end of March which we agreed would be the
>> time
>> > > for
>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>> seems
>> > > to
>> > > >> be a viable plan. I think it is a good time to agree on a
>> particular
>> > > date,
>> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
>> > > >> (Wednesday next week) as the feature freeze time.
>> > > >>
>> > > >> Similarly as last time, we want to create RC0 on the day after the
>> > > feature
>> > > >> freeze, to make sure the RC creation process is running smoothly,
>> and
>> > to
>> > > >> have a common testing reference point.
>> > > >>
>> > > >> Having said that let us remind after Robert & Dian from the
>> previous
>> > > >> release what it a Feature Freeze means:
>> > > >>
>> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
>> > > >> features are allowed to be merged to master. Only bug fixes and
>> > > >> documentation improvements.
>> > > >> The release managers will revert new feature commits after the
>> feature
>> > > >> freeze.
>> > > >> Rational: The goal of the feature freeze phase is to improve the
>> > system
>> > > >> stability by addressing known bugs. New features tend to introduce
>> new
>> > > >> instabilities, which would prolong the release process.
>> > > >> If you need to merge a new feature after the freeze, please open a
>> > > >> discussion on the dev@ list. If there are no objections by a PMC
>> > member
>> > > >> within 48 (workday)hours, the feature can be merged.
>> > > >>
>> > > >> 2. *Merge PRs from the command line*
>> > > >>
>> > > >> In the past releases it was quite frequent around the Feature
>> Freeze
>> > > date
>> > > >> that we ended up with a broken main branch that either did not
>> compile
>> > > or
>> > > >> there were failing tests. It was often due to concurrent merges to
>> the
>> > > main
>> > > >> branch via the "Rebase and merge" button. To overcome the problem
>> we
>> > > would
>> > > >> like to suggest only ever merging PRs from a command line. Thank
>> you
>> > > >> Stephan for the idea! The suggested workflow would look as follows:
>> > > >>
>> > > >>1. Pull the change and rebase on the current main branch
>> > > >>2. Build the project (e.g. from IDE, which should be faster than
>> > > >>building entire project from cmd) -> this should ensure the
>> project
>> > > compiles
>> > > >>3. Run the tests in the module that the change affects -> this
>> > should
>> > > >>greatly minimize the chances of failling tests
>> > > >>4. Push the change to the main branch
>> > > >>
>> > > >> Let us know what you think!
>> > > >>
>> > > >> Best,
>> > > >>
>> > > >> Guowei & Dawid
>> > > >>
>> > > >>
>> > > >>
>> > >
>> > >
>> >
>>
>


Re: [DISCUSS] FLIP-162 follow-up discussion

2021-03-08 Thread Kurt Young
 Hi Leonard,

Thanks for this careful consideration. Given the fallback option will
eventually change the behavior twice, which means
potentially break user's job twice, I would also +1 to not introduce it.

Best,
Kurt

On Fri, Mar 5, 2021 at 3:00 PM Leonard Xu  wrote:

> Hi, all
>
> As the FLIP-162 discussed,  we agreed current time functions’ behavior is
> incorrect and plan to introduce the option 
> *t**able.exec.fallback-legacy-time-function
> *to enable user fallback to incorrect behavior.
>
> (1) The option is convenient for users who want to upgrade to 1.13 but
> don't want to change their sql job, user need to config the option value, 
> *this
> is the first time users influenced by these wrong functions.*
>
> (2) But we didn’t consider that the option will be deleted after one or
> two major versions, users have to change their sql job again at that time
> point, *this the second time** users influenced by these wrong functions.*
>
> (3) Besides, maintaining two sets of functions is prone to bugs.
>
> I’ve discussed with some community developers offline, they tend to solve
> these functions at once i.e. Correct the wrong functions directly and do
> not introduce this option.
>
> Considering that we will delete the configuration eventually,  comparing
> hurting users twice and bothering them for a long time, I would rather hurt
> users once.
> *Thus I also +1* that we should directly correct these wrong functions
> and remove the wrong functions at the same time.
>
>
> If we can make a consensus in this thread, I think we can remove this
> option support in FLIP-162.
> How do you think?
>
> Best,
> Leonard
>
>
>
>
>


Re: [VOTE] FLIP-162: Consistent Flink SQL time function behavior

2021-03-03 Thread Kurt Young
+1 (binding)

Best,
Kurt


On Wed, Mar 3, 2021 at 3:43 PM Timo Walther  wrote:

> +1 (binding)
>
> Regards,
> Timo
>
> On 03.03.21 04:14, Jark Wu wrote:
> > +1 (binding)
> >
> > Best,
> > Jark
> >
> > On Tue, 2 Mar 2021 at 10:42, Leonard Xu  wrote:
> >
> >> Hi all,
> >>
> >> I would like to start the vote for FLIP-162 [1], which has been
> discussed
> >> and
> >> reached a consensus in the discussion thread [2].
> >>
> >> Please vote +1 to approve the FLIP, or -1 with a comment.
> >>
> >> The vote will be open until March 5th (72h), unless there is an
> objection
> >> or not enough votes.
> >>
> >> Best,
> >> Leonard
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162:+Consistent+Flink+SQL+time+function+behavior
> >>>
> >> [2]
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-162-Consistent-Flink-SQL-time-function-behavior-tc48116.html
> >> <
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-162-Consistent-Flink-SQL-time-function-behavior-tc48116.html
> >>>
> >
>
>


Re: [VOTE] Release 1.12.2, release candidate #2

2021-03-01 Thread Kurt Young
+1 (binding)

- We mainly checked the patch of FLINK-20663 [1] and confirmed there is no
OutOfManagedMemory error anymore.

[1] https://issues.apache.org/jira/browse/FLINK-20663

Best,
Kurt


On Tue, Mar 2, 2021 at 12:41 PM Yu Li  wrote:

> +1 (binding)
>
> - Checked the diff between 1.12.1 and 1.12.2-rc2: OK (
> https://github.com/apache/flink/compare/release-1.12.1...release-1.12.2-rc2
> )
>   - jackson version has been bumped to 2.10.5.1 through FLINK-21020 and all
> NOTICE files updated correctly
>   - beanutils version has been bumped to 1.9.4 through FLINK-21123 and all
> NOTICE files updated correctly
>   - testcontainer version has been bumped to 1.15.1 through FLINK-21277 and
> no NOTICE files impact
>   - japicmp version has been bumped to 1.12.1 and no NOTICE files impact
> - Checked release notes: OK
> - Checked sums and signatures: OK
> - Maven clean install from source: OK
> - Checked the jars in the staging repo: OK
> - Checked the website updates: OK (minor: corrected fix version of
> FLINK-21515  to make
> sure the website PR consistent with release note)
>
> Note: there's a vulnerability suspicion against 1.12.2-rc2 reported in
> user-zh mailing list [1] w/o enough evidence/information. Have asked the
> reporter to do more testing to confirm and I don't think it's a blocker for
> the release, but just a note here in case anyone has a different opinion.
>
> Thanks a lot for managing the new RC!
>
> Best Regards,
> Yu
>
> [1] http://apache-flink.147419.n8.nabble.com/flink-1-12-2-rc2-td11023.html
>
> On Tue, 2 Mar 2021 at 01:51, Piotr Nowojski  wrote:
>
> > +1 (binding)
> >
> > For the RC2 I have additionally confirmed that "stop-with-savepoint", and
> > "stop-with-savepoint --drain" seems to be working.
> >
> > Piotrek
> >
> > pon., 1 mar 2021 o 11:18 Matthias Pohl 
> > napisał(a):
> >
> > > Thanks for managing release 1.12.2, Yuan & Roman.
> > >
> > > +1 (non-binding)
> > >
> > > - Verified checksums and GPG of artifacts in [1]
> > > - Build the sources locally without errors
> > > - Started a local standalone cluster and deployed WordCount without
> > > problems (no suspicious logs identified)
> > > - Verified FLINK-21030 [2] by running the example jobs from the
> > > FLINK-21030-related SavepointITCase tests
> > >
> > > Best,
> > > Matthias
> > >
> > > [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.2-rc2/
> > > [2] https://issues.apache.org/jira/browse/FLINK-21030
> > >
> > > On Sun, Feb 28, 2021 at 2:41 PM Yuan Mei 
> wrote:
> > >
> > > > Hey Roman,
> > > >
> > > > Thank you very much for preparing RC2.
> > > >
> > > > +1 from my side.
> > > >
> > > > 1. Verified Checksums and GPG signatures.
> > > > 2. Verified that the source archives do not contain any binaries.
> > > > 3. Successfully Built the source with Maven.
> > > > 4. Started a local Flink cluster, ran the streaming WordCount example
> > > with
> > > > WebUI,
> > > > checked the output and JM/TM log, no suspicious output/log.
> > > > 5. Repeat Step 4 with the binary release as well, no suspicious
> > > output/log.
> > > > 6. Checked for source and binary release to make sure both an Apache
> > > > License file and a NOTICE file are included.
> > > > 7. Manually verified that no pom file changes between 1.12.2-rc1 and
> > > > 1.12.2-rc2; no obvious license problem.
> > > > 8. Review the release PR for RC2 updates, and double confirmed the
> > > > change-list for 1.12.2.
> > > >
> > > > Best,
> > > > Yuan
> > > >
> > > > On Sat, Feb 27, 2021 at 7:19 AM Roman Khachatryan 
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Please review and vote on the release candidate #1 for the version
> > > > 1.12.2,
> > > > > as follows:
> > > > > [ ] +1, Approve the release
> > > > > [ ] -1, Do not approve the release (please provide specific
> comments)
> > > > >
> > > > >
> > > > > The complete staging area is available for your review, which
> > includes:
> > > > > * JIRA release notes [1],
> > > > > * the official Apache source release and binary convenience
> releases
> > to
> > > > be
> > > > > deployed to dist.apache.org [2], which are signed with the key
> with
> > > > > fingerprint 0D545F264D2DFDEBFD4E038F97B4625E2FCF517C [3],
> > > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > > * source code tag "release-1.12.2-rc2" [5],
> > > > > * website pull request listing the new release and adding
> > announcement
> > > > blog
> > > > > post [6].
> > > > >
> > > > > The vote will be open for at least 72 hours. It is adopted by
> > majority
> > > > > approval, with at least 3 PMC affirmative votes.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
> > > > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.2-rc2/
> > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > > [4]
> > > > >
> > >
> 

Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-03-01 Thread Kurt Young
I'm +1 for either:
1. introduce a sql client specific option, or
2. Introduce a table config option and make it apply to both table module &
sql client.

It would be the FLIP owner's call to decide.

Best,
Kurt


On Mon, Mar 1, 2021 at 3:25 PM Timo Walther  wrote:

> We could also think about reading this config option in Table API. The
> effect would be to call `await()` directly in an execute call. I could
> also imagine this to be useful esp. when you fire a lot of insert into
> queries. We had the case before that users where confused that the
> execution happens asynchronously, such an option could prevent this to
> happen again.
>
> Regards,
> Timo
>
> On 01.03.21 05:14, Kurt Young wrote:
> > I also asked some users about their opinion that if we introduce some
> > config prefixed with "table" but doesn't
> > have affection with methods in Table API and SQL. All of them are kind of
> > shocked by such question, asking
> > why we would do anything like this.
> >
> > This kind of reaction actually doesn't surprise me a lot, so I jumped in
> > and challenged this config option even
> > after the FLIP had already been accepted.
> >
> > If we only have to define the execution behavior for multiple statements
> in
> > SQL client, we should only introduce
> > a config option which would tell users it's affection scope by its name.
> > Prefixing with "table" is definitely not a good
> > idea here.
> >
> > Best,
> > Kurt
> >
> >
> > On Fri, Feb 26, 2021 at 9:39 PM Leonard Xu  wrote:
> >
> >> Hi, all
> >>
> >> Look like there’s only one divergence about option [ table | sql-client
> >> ].dml-sync in this thread, correct me if I’m wrong.
> >>
> >> 1. Leaving the context of this thread, from a user's perspective,
> >> the table.xx configurations should take effect in Table API & SQL,
> >> the sql-client.xx configurations should only take effect in sql-client.
> >>   In my(the user's) opinion, other explanations are counterintuitive.
> >>
> >> 2.  It should be pointed out that both all existed table.xx
> configurations
> >> like table.exec.state.ttl, table.optimizer.agg-phase-strategy,
> >> table.local-time-zone,etc..  and the proposed sql-client.xx
> configurations
> >> like sql-client.verbose, sql-client.execution.max-table-result.rows
> >> comply with this convention.
> >>
> >> 3. Considering the portability to support different CLI tools
> (sql-client,
> >> sql-gateway, etc.), I prefer table.dml-sync.
> >>
> >> In addition, I think sql-client/sql-gateway/other CLI tools can be
> placed
> >> out of flink-table module even in an external project, this should not
> >> affect our conclusion.
> >>
> >>
> >> Hope this can help you.
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>
> >>> 在 2021年2月25日,18:51,Shengkai Fang  写道:
> >>>
> >>> Hi, everyone.
> >>>
> >>> I do some summaries about the discussion about the option. If the
> summary
> >>> has errors, please correct me.
> >>>
> >>> `table.dml-sync`:
> >>> - take effect for `executeMultiSql` and sql client
> >>> - benefit: SQL script portability. One script for all platforms.
> >>> - drawback: Don't work for `TableEnvironment#executeSql`.
> >>>
> >>> `table.multi-dml-sync`:
> >>> - take effect for `executeMultiSql` and sql client
> >>> - benefit: SQL script portability
> >>> - drawback: It's confused when the sql script has one dml statement but
> >>> need to set option `table.multi-dml-sync`
> >>>
> >>> `client.dml-sync`:
> >>> - take effect for sql client only
> >>> - benefit: clear definition.
> >>> - drawback: Every platform needs to define its own option. Bad SQL
> script
> >>> portability.
> >>>
> >>> Just as Jark said, I think the `table.dml-sync` is a good choice if we
> >> can
> >>> extend its scope and make this option works for `executeSql`.
> >>> It's straightforward and users can use this option now in table api.
> The
> >>> drawback is the  `TableResult#await` plays the same role as the option.
> >> I
> >>> don't think the drawback is really critical because many systems have
> >>> commands play the same role with the different names.
> >>>
> >

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-03-01 Thread Kurt Young
fied streaming-batch semantics? *
> >>>>>>>
> >>>>>>> I don't think so. First of all, what's the unified streaming-batch
> >>>>>>> semantic?
> >>>>>>> I think it means the* eventual result* instead of the *behavior*.
> >>>>>>> It's hard to say we have provided unified behavior for streaming
> and
> >>>>> batch
> >>>>>>> jobs,
> >>>>>>> because for example unbounded aggregate behaves very differently.
> >>>>>>> In batch mode, it only evaluates once for the bounded data and
> >>>>>>> emits the
> >>>>>>> aggregate result once.
> >>>>>>> But in streaming mode, it evaluates for each row and emits the
> >>>>>>> updated
> >>>>>>> result.
> >>>>>>> What we have always emphasized "unified streaming-batch
> >>>>>>> semantics" is
> >>>>> [1]
> >>>>>>>
> >>>>>>>> a query produces exactly the same result regardless whether its
> >>>>>>>> input
> >>>>> is
> >>>>>>> static batch data or streaming data.
> >>>>>>>
> >>>>>>>  From my understanding, the "semantic" means the "eventual result".
> >>>>>>> And time functions are non-deterministic, so it's reasonable to get
> >>>>>>> different results for batch and streaming mode.
> >>>>>>> Therefore, I think it doesn't break the unified streaming-batch
> >>>>> semantics
> >>>>>>> to evaluate per-record for streaming and
> >>>>>>> query-start for batch, as the semantic doesn't means behavior
> >>>>>>> semantic.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>> [1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html
> >>>>>>>
> >>>>>>> On Tue, 2 Feb 2021 at 18:34, Fabian Hueske 
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> Sorry for joining this discussion late.
> >>>>>>>> Let me give some thought to two of the arguments raised in this
> >>>>>>>> thread.
> >>>>>>>>
> >>>>>>>> Time functions are inherently non-determintistic:
> >>>>>>>> --
> >>>>>>>> This is of course true, but IMO it doesn't mean that the
> >>>>>>>> semantics of
> >>>>>>> time
> >>>>>>>> functions do not matter.
> >>>>>>>> It makes a difference whether a function is evaluated once and
> it's
> >>>>>>> result
> >>>>>>>> is reused or whether it is invoked for every record.
> >>>>>>>> Would you use the same logic to justify different behavior of
> >>>>>>>> RAND() in
> >>>>>>>> batch and streaming queries?
> >>>>>>>>
> >>>>>>>> Provide the semantics that most users expect:
> >>>>>>>> --
> >>>>>>>> I don't think it is clear what most users expect, esp. if we also
> >>>>> include
> >>>>>>>> future users (which we certainly want to gain) into this
> >>>>>>>> assessment.
> >>>>>>>> Our current users got used to the semantics that we introduced.
> >>>>>>>> So I
> >>>>>>>> wouldn't be surprised if they would say stick with the current
> >>>>> semantics.
> >>>>>>>> However, we are also claiming standard SQL compliance and stress
> >>>>>>>> the
> >>>>> goal
> >>>>>>>> of batch-stream unification.
> >>>>>>>> So I would assume that new SQL users expect standard compliant
> >>>>>>>> behavior
> >>>>>>> for
> >>>>>>>> batch and streaming queries.
> >>>

Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-28 Thread Kurt Young
 as they expect it should take effect on
> >>> the Table API.
> >>>
> >>> If we want to introduce an unified "table.dml-sync" option, I prefer
> >>> it should be implemented on Table API and affect all the DMLs on
> >>> Table API (`tEnv.executeSql`, `Table.executeInsert`, `StatementSet`),
> >>> as I have mentioned before [1].
> >>>
> >>>> It would be very straightforward that it affects all the DMLs on SQL
> CLI
> >>> and
> >>> TableEnvironment (including `executeSql`, `StatementSet`,
> >>> `Table#executeInsert`, etc.).
> >>> This can also make SQL CLI easy to support this configuration by
> passing
> >>> through to the TableEnv.
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>>
> >>> [1]:
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-163-SQL-Client-Improvements-tp48354p48665.html
> >>>
> >>>
> >>> On Wed, 24 Feb 2021 at 10:39, Kurt Young  wrote:
> >>>
> >>>> If we all agree the option should only be handled by sql client, then
> >> why
> >>>> don't we
> >>>> just call it `sql-client.dml-sync`? As you said, calling it
> >>>> `table.dml-sync` but has no
> >>>> affection in `TableEnv.executeSql("INSERT INTO")` will also cause a
> big
> >>>> confusion for
> >>>> users.
> >>>>
> >>>> The only concern I saw is if we introduce
> >>>> "TableEnvironment.executeMultiSql()" in the
> >>>> future, how do we control the synchronization between statements? TBH
> I
> >>>> don't really
> >>>> see a strong requirement for such interfaces. Right now, we have a
> >> pretty
> >>>> clear semantic
> >>>> of `TableEnv.executeSql`, and it's very convenient for users if they
> >> want
> >>>> to execute multiple
> >>>> sql statements. They can simulate either synced or async execution
> with
> >>>> this building block.
> >>>>
> >>>> This will introduce slight overhead for users, but compared to the
> >>>> confusion we might
> >>>> cause if we introduce such a method of our own, I think it's better to
> >> wait
> >>>> for some more
> >>>> feedback.
> >>>>
> >>>> Best,
> >>>> Kurt
> >>>>
> >>>>
> >>>> On Tue, Feb 23, 2021 at 9:45 PM Timo Walther 
> >> wrote:
> >>>>
> >>>>> Hi Kurt,
> >>>>>
> >>>>> we can also shorten it to `table.dml-sync` if that would help. Then
> it
> >>>>> would confuse users that do a regular `.executeSql("INSERT INTO")`
> in a
> >>>>> notebook session.
> >>>>>
> >>>>> In any case users will need to learn the semantics of this option.
> >>>>> `table.multi-dml-sync` should be described as "If a you are in a
> multi
> >>>>> statement environment, execute DMLs synchrounous.". I don't have a
> >>>>> strong opinion on shortening it to `table.dml-sync`.
> >>>>>
> >>>>> Just to clarify the implementation: The option should be handled by
> the
> >>>>> SQL Client only, but the name can be shared accross platforms.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 23.02.21 09:54, Kurt Young wrote:
> >>>>>> Sorry for the late reply, but I'm confused by
> `table.multi-dml-sync`.
> >>>>>>
> >>>>>> IIUC this config will take effect with 2 use cases:
> >>>>>> 1. SQL client, either interactive mode or executing multiple
> >> statements
> >>>>> via
> >>>>>> -f. In most cases,
> >>>>>> there will be only one INSERT INTO statement but we are controlling
> >> the
> >>>>>> sync/async behavior
> >>>>>> with "*multi-dml*-sync". I think this will confuse a lot of users.
> >>>>> Besides,
> >>>>>>
> >>>>>> 2. TableEnvironment#executeMultiSql(), but this is future work, we
> are
> >>>>> als

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-02-25 Thread Kurt Young


Hi Timo,

First of all I want to thank you for introducing this planner design back
in 1.9, this is a great work
that allows lots of blink features to be merged to Flink in a reasonably
short time. It greatly
accelerates the evolution speed of Table & SQL.

Everything comes with a cost, as you said, right now we are facing the
overhead of maintaining
two planners and it causes bugs and also increases imbalance between these
two planners. As
a developer and also for the good of all Table & SQL users, I also think
it's better for us to be more
focused on a single planner.

Your proposed roadmap looks good to me, +1 from my side and thanks
again for all your efforts!

Best,
Kurt


On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:

> Hi everyone,
>
> since Flink 1.9 we have supported two SQL planners. Most of the original
> plan of FLIP-32 [1] has been implemented. The Blink code merge has been
> completed and many additional features have been added exclusively to
> the new planner. The new planner is now in a much better shape than the
> legacy one.
>
> In order to avoid user confusion, reduce duplicate code, and improve
> maintainability and testing times of the Flink project as a whole we
> would like to propose the following steps to complete FLIP-32:
>
> In Flink 1.13:
> - Deprecate the `flink-table-planner` module
> - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
>
> In Flink 1.14:
> - Drop `flink-table-planner` early
> - Drop many deprecated interfaces and API on demand
> - Rename `flink-table-planner-blink` to `flink-table-planner`
> - Rename `flink-table-runtime-blink` to `flink-table-runtime`
> - Remove references of "Blink" in the code base
>
> This will have an impact on users that still use DataSet API together
> with Table API. With this change we will not support converting between
> DataSet API and Table API anymore. We hope to compensate the missing
> functionality in the new unified TableEnvironment and/or the batch mode
> in DataStream API during 1.14 and 1.15. For this, we are looking for
> further feedback which features are required in Table API/DataStream API
> to have a smooth migration path.
>
> Looking forward to your feedback.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
>


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-23 Thread Kurt Young
If we all agree the option should only be handled by sql client, then why
don't we
just call it `sql-client.dml-sync`? As you said, calling it
`table.dml-sync` but has no
affection in `TableEnv.executeSql("INSERT INTO")` will also cause a big
confusion for
users.

The only concern I saw is if we introduce
"TableEnvironment.executeMultiSql()" in the
future, how do we control the synchronization between statements? TBH I
don't really
see a strong requirement for such interfaces. Right now, we have a pretty
clear semantic
of `TableEnv.executeSql`, and it's very convenient for users if they want
to execute multiple
sql statements. They can simulate either synced or async execution with
this building block.

This will introduce slight overhead for users, but compared to the
confusion we might
cause if we introduce such a method of our own, I think it's better to wait
for some more
feedback.

Best,
Kurt


On Tue, Feb 23, 2021 at 9:45 PM Timo Walther  wrote:

> Hi Kurt,
>
> we can also shorten it to `table.dml-sync` if that would help. Then it
> would confuse users that do a regular `.executeSql("INSERT INTO")` in a
> notebook session.
>
> In any case users will need to learn the semantics of this option.
> `table.multi-dml-sync` should be described as "If a you are in a multi
> statement environment, execute DMLs synchrounous.". I don't have a
> strong opinion on shortening it to `table.dml-sync`.
>
> Just to clarify the implementation: The option should be handled by the
> SQL Client only, but the name can be shared accross platforms.
>
> Regards,
> Timo
>
>
> On 23.02.21 09:54, Kurt Young wrote:
> > Sorry for the late reply, but I'm confused by `table.multi-dml-sync`.
> >
> > IIUC this config will take effect with 2 use cases:
> > 1. SQL client, either interactive mode or executing multiple statements
> via
> > -f. In most cases,
> > there will be only one INSERT INTO statement but we are controlling the
> > sync/async behavior
> > with "*multi-dml*-sync". I think this will confuse a lot of users.
> Besides,
> >
> > 2. TableEnvironment#executeMultiSql(), but this is future work, we are
> also
> > not sure if we will
> > really introduce this in the future.
> >
> > I would prefer to introduce this option for only sql client. For
> platforms
> > Timo mentioned which
> > need to control such behavior, I think it's easy and flexible to
> introduce
> > one on their own.
> >
> > Best,
> > Kurt
> >
> >
> > On Sat, Feb 20, 2021 at 10:23 AM Shengkai Fang 
> wrote:
> >
> >> Hi everyone.
> >>
> >> Sorry for the late response.
> >>
> >> For `execution.runtime-mode`, I think it's much better than
> >> `table.execution.mode`. Thanks for Timo's suggestions!
> >>
> >> For `SHOW CREATE TABLE`, I'm +1 with Jark's comments. We should clarify
> the
> >> usage of the SHOW CREATE TABLE statements. It should be allowed to
> specify
> >> the table that is fully qualified and only works for the table that is
> >> created by the sql statements.
> >>
> >> I have updated the FLIP with suggestions. It seems we have reached a
> >> consensus, I'd like to start a formal vote for the FLIP.
> >>
> >> Please vote +1 to approve the FLIP, or -1 with a comment.
> >>
> >> Best,
> >> Shengkai
> >>
> >> Jark Wu  于2021年2月15日周一 下午10:50写道:
> >>
> >>> Hi Ingo,
> >>>
> >>> 1) I think you are right, the table path should be fully-qualified.
> >>>
> >>> 2) I think this is also a good point. The SHOW CREATE TABLE
> >>> only aims to print DDL for the tables registered using SQL CREATE TABLE
> >>> DDL.
> >>> If a table is registered using Table API,  e.g.
> >>> `StreamTableEnvironment#createTemporaryView(String, DataStream)`,
> >>> currently it's not possible to print DDL for such tables.
> >>> I think we should point it out in the FLIP.
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>>
> >>>
> >>> On Mon, 15 Feb 2021 at 21:33, Ingo Bürk  wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I have a couple questions about the SHOW CREATE TABLE statement.
> >>>>
> >>>> 1) Contrary to the example in the FLIP I think the returned DDL should
> >>>> always have the table identifier fully-qualified. Otherwise the DDL
> >>> depends
> >>>> on the current context (catalog/dat

Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-23 Thread Kurt Young
Sorry for the late reply, but I'm confused by `table.multi-dml-sync`.

IIUC this config will take effect with 2 use cases:
1. SQL client, either interactive mode or executing multiple statements via
-f. In most cases,
there will be only one INSERT INTO statement but we are controlling the
sync/async behavior
with "*multi-dml*-sync". I think this will confuse a lot of users. Besides,

2. TableEnvironment#executeMultiSql(), but this is future work, we are also
not sure if we will
really introduce this in the future.

I would prefer to introduce this option for only sql client. For platforms
Timo mentioned which
need to control such behavior, I think it's easy and flexible to introduce
one on their own.

Best,
Kurt


On Sat, Feb 20, 2021 at 10:23 AM Shengkai Fang  wrote:

> Hi everyone.
>
> Sorry for the late response.
>
> For `execution.runtime-mode`, I think it's much better than
> `table.execution.mode`. Thanks for Timo's suggestions!
>
> For `SHOW CREATE TABLE`, I'm +1 with Jark's comments. We should clarify the
> usage of the SHOW CREATE TABLE statements. It should be allowed to specify
> the table that is fully qualified and only works for the table that is
> created by the sql statements.
>
> I have updated the FLIP with suggestions. It seems we have reached a
> consensus, I'd like to start a formal vote for the FLIP.
>
> Please vote +1 to approve the FLIP, or -1 with a comment.
>
> Best,
> Shengkai
>
> Jark Wu  于2021年2月15日周一 下午10:50写道:
>
> > Hi Ingo,
> >
> > 1) I think you are right, the table path should be fully-qualified.
> >
> > 2) I think this is also a good point. The SHOW CREATE TABLE
> > only aims to print DDL for the tables registered using SQL CREATE TABLE
> > DDL.
> > If a table is registered using Table API,  e.g.
> > `StreamTableEnvironment#createTemporaryView(String, DataStream)`,
> > currently it's not possible to print DDL for such tables.
> > I think we should point it out in the FLIP.
> >
> > Best,
> > Jark
> >
> >
> >
> > On Mon, 15 Feb 2021 at 21:33, Ingo Bürk  wrote:
> >
> > > Hi all,
> > >
> > > I have a couple questions about the SHOW CREATE TABLE statement.
> > >
> > > 1) Contrary to the example in the FLIP I think the returned DDL should
> > > always have the table identifier fully-qualified. Otherwise the DDL
> > depends
> > > on the current context (catalog/database), which could be surprising,
> > > especially since "the same" table can behave differently if created in
> > > different catalogs.
> > > 2) How should this handle tables which cannot be fully characterized by
> > > properties only? I don't know if there's an example for this yet, but
> > > hypothetically this is not currently a requirement, right? This isn't
> as
> > > much of a problem if this syntax is SQL-client-specific, but if it's
> > > general Flink SQL syntax we should consider this (one way or another).
> > >
> > >
> > > Regards
> > > Ingo
> > >
> > > On Fri, Feb 12, 2021 at 3:53 PM Timo Walther 
> wrote:
> > >
> > > > Hi Shengkai,
> > > >
> > > > thanks for updating the FLIP.
> > > >
> > > > I have one last comment for the option `table.execution.mode`. Should
> > we
> > > > already use the global Flink option `execution.runtime-mode` instead?
> > > >
> > > > We are using Flink's options where possible (e.g. `pipeline.name`
> and
> > > > `parallism.default`) why not also for batch/streaming mode?
> > > >
> > > > The description of the option matches to the Blink planner behavior:
> > > >
> > > > ```
> > > > Among other things, this controls task scheduling, network shuffle
> > > > behavior, and time semantics.
> > > > ```
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > > On 10.02.21 06:30, Shengkai Fang wrote:
> > > > > Hi, guys.
> > > > >
> > > > > I have updated the FLIP.  It seems we have reached agreement. Maybe
> > we
> > > > can
> > > > > start the vote soon. If anyone has other questions, please leave
> your
> > > > > comments.
> > > > >
> > > > > Best,
> > > > > Shengkai
> > > > >
> > > > > Rui Li 于2021年2月9日 周二下午7:52写道:
> > > > >
> > > > >> Hi guys,
> > > > >>
> > > > >> The conclusion sounds good to me.
> > > > >>
> > > > >> On Tue, Feb 9, 2021 at 5:39 PM Shengkai Fang 
> > > wrote:
> > > > >>
> > > > >>> Hi, Timo, Jark.
> > > > >>>
> > > > >>> I am fine with the new option name.
> > > > >>>
> > > > >>> Best,
> > > > >>> Shengkai
> > > > >>>
> > > > >>> Timo Walther 于2021年2月9日 周二下午5:35写道:
> > > > >>>
> > > >  Yes, `TableEnvironment#executeMultiSql()` can be future work.
> > > > 
> > > >  @Rui, Shengkai: Are you also fine with this conclusion?
> > > > 
> > > >  Thanks,
> > > >  Timo
> > > > 
> > > >  On 09.02.21 10:14, Jark Wu wrote:
> > > > > I'm fine with `table.multi-dml-sync`.
> > > > >
> > > > > My previous concern about "multi" is that DML in CLI looks like
> > > > >> single
> > > > > statement.
> > > > > But we can treat CLI as a multi-line accepting statements from
> > > > >> opening
> > > > >>> to
> > > > > closing.
> > > > 

Re: [VOTE] FLIP-152: Hive Query Syntax Compatibility

2021-02-07 Thread Kurt Young
+1

Best,
Kurt


On Sun, Feb 7, 2021 at 7:24 PM Rui Li  wrote:

> Hi everyone,
>
> I think we have reached some consensus on FLIP-152 [1] in the discussion
> thread [2]. So I'd like to start the vote for this FLIP.
>
> The vote will be open for 72 hours, until Feb. 10 2021 01:00 PM UTC, unless
> there's an objection.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html
>
> --
> Best regards!
> Rui Li
>


Re: [DISCUSS] Releasing Apache Flink 1.12.2

2021-02-05 Thread Kurt Young
Thanks for being our release manager Yuan.

We found a out of memory issue [1] which will affect most batch jobs thus I
think
it would be great if we can include this fix in 1.12.2.

[1] https://issues.apache.org/jira/browse/FLINK-20663

Best,
Kurt


On Sat, Feb 6, 2021 at 12:36 AM Till Rohrmann  wrote:

> Thanks for volunteering for being our release manager Yuan :-)
>
> +1 for a timely bug fix release.
>
> I will try to review the PR for FLINK- 20417 [1] which is a good fix to
> include in the next bug fix release. We don't have to block the release on
> this fix though.
>
> [1] https://issues.apache.org/jira/browse/FLINK-20417
>
> Cheers,
> Till
>
> On Fri, Feb 5, 2021 at 5:12 PM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > Thanks Yuan for bringing up this topic.
> >
> > +1 for the quick 1.12.2 release.
> >
> > As Yuan mentioned, me and Roman can help whenever committer rights will
> be
> > required.
> >
> > > I am a Firefox user and I just fixed a long lasting bug
> > https://github.com/apache/flink/pull/14848 , wish this would be merged
> in
> > this release.
> >
> > I will push to speed up review of your PR. Let's try to merge it before
> > 1.12.2, but at the same time I wouldn't block the release on this bug.
> >
> > Best,
> > Piotrek
> >
> > pt., 5 lut 2021 o 12:12 郁蓝  napisał(a):
> >
> > > Hi Yuan,
> > >
> > >
> > > I am a Firefox user and I just fixed a long lasting bug
> > > https://github.com/apache/flink/pull/14848 , wish this would be merged
> > in
> > > this release.
> > >
> > >
> > > Best wishes
> > >
> > >
> > >
> > > --原始邮件--
> > > 发件人:
> > >   "dev"
> > > <
> > > yuanmei.w...@gmail.com;
> > > 发送时间:2021年2月5日(星期五) 晚上6:36
> > > 收件人:"dev" > >
> > > 主题:[DISCUSS] Releasing Apache Flink 1.12.2
> > >
> > >
> > >
> > > Hey devs,
> > >
> > > One of the major issues that have not been resolved in Apache Flink
> > 1.12.1
> > > is "unaligned
> > > checkpoint recovery may lead to corrupted data stream"[1]. Since the
> > > problem is now fixed and
> > > it is critical to the users, I would like to kick off a discussion on
> > > releasing Flink 1.12.2 that
> > > includes unaligned checkpoint fixes.
> > >
> > > I would like to volunteer myself for managing this release. But I
> noticed
> > > that some of the release
> > > steps may require committer authorities. Luckily, Piotr and Roman are
> > very
> > > kind to provide help
> > > on such steps.
> > >
> > > Apart from the unaligned checkpoint issue, please let us know in this
> > > thread if there are any
> > > other fixes that we should try to include in this version. I'll try to
> > > communicate with the issue
> > > owners and come up with a time estimation next week.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-20654
> > >
> > > Best
> > > Yuan
> >
>


Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Kurt Young
BTW I also don't like to introduce an option for this case at the
first step.

If we can find a default behavior which can make 90% users happy, we should
do it. If the remaining
10% percent users start to complain about the fixed behavior (it's also
possible that they don't complain ever),
 we could offer an option to make them happy. If it turns out that we had
wrong estimation about the user's
expectation, we should change the default behavior.

Best,
Kurt


On Tue, Feb 2, 2021 at 4:46 PM Kurt Young  wrote:

> Hi Timo,
>
> I don't think batch-stream unification can deal with all the cases,
> especially if
> the query involves some non deterministic functions.
>
> No matter we choose any options, these queries will have
> different results.
> For example, if we run the same query in batch mode multiple times, it's
> also
> highly possible that we get different results. Does that mean all the
> database
> vendors can't deliver batch-batch unification? I don't think so.
>
> What's really important here is the user's intuition. What do users expect
> if
> they don't read any documents about these functions. For batch users, I
> think
> it's already clear enough that all other systems and databases will
> evaluate
> these functions during query start. And for streaming users, I have
> already seen
> some users are expecting these functions to be calculated per record.
>
> Thus I think we can make the behavior determined together with execution
> mode.
> One exception would be PROCTIME(), I think all users would expect this
> function
> will be calculated for each record. I think SYS_CURRENT_TIMESTAMP is
> similar
> to PROCTIME(), so we don't have to introduce it.
>
> Best,
> Kurt
>
>
> On Tue, Feb 2, 2021 at 4:20 PM Timo Walther  wrote:
>
>> Hi everyone,
>>
>> I'm not sure if we should introduce the `auto` mode. Taking all the
>> previous discussions around batch-stream unification into account, batch
>> mode and streaming mode should only influence the runtime efficiency and
>> incremental computation. The final query result should be the same in
>> both modes. Also looking into the long-term future, we might drop the
>> mode property and either derive the mode or use different modes for
>> parts of the pipeline.
>>
>> "I think we may need to think more from the users' perspective."
>>
>> I agree here and that's why I actually would like to let the user decide
>> which semantics are needed. The config option proposal was my least
>> favored alternative. We should stick to the standard and bahavior of
>> other systems. For both batch and streaming. And use a simple prefix to
>> let users decide whether the semantics are per-record or per-query:
>>
>> CURRENT_TIMESTAMP   -- semantics as all other vendors
>>
>>
>> _CURRENT_TIMESTAMP  -- semantics per record
>>
>> OR
>>
>> SYS_CURRENT_TIMESTAMP  -- semantics per record
>>
>>
>> Please check how other vendors are handling this:
>>
>> SYSDATE  MySql, Oracle
>> SYSDATETIME  SQL Server
>>
>>
>> Regards,
>> Timo
>>
>>
>> On 02.02.21 07:02, Jingsong Li wrote:
>> > +1 for the default "auto" to the "table.exec.time-function-evaluation".
>> >
>> >>From the definition of these functions, in my opinion:
>> > - Batch is the instant execution of all records, which is the meaning of
>> > the word "BATCH", so there is only one time at query-start.
>> > - Stream only executes a single record in a moment, so time is
>> generated by
>> > each record.
>> >
>> > On the other hand, we should be more careful about consistency with
>> other
>> > systems.
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Tue, Feb 2, 2021 at 11:24 AM Jark Wu  wrote:
>> >
>> >> Hi Leonard, Timo,
>> >>
>> >> I just did some investigation and found all the other batch processing
>> >> systems
>> >>   evaluate the time functions at query-start, including Snowflake,
>> Hive,
>> >> Spark, Trino.
>> >> I'm wondering whether the default 'per-record' mode will still be
>> weird for
>> >> batch users.
>> >> I know we proposed the option for batch users to change the behavior.
>> >> However if 90% users need to set this config before submitting batch
>> jobs,
>> >> why not
>> >> use this mode for batch by default? For the other 10% special users,
>> they
>> >> can still
>> 

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Kurt Young
more from the users'
> >> perspective.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 1 Feb 2021 at 23:06, Timo Walther  wrote:
> >>
> >>> Hi Leonard,
> >>>
> >>> thanks for considering this issue as well. +1 for the proposed config
> >>> option. Let's start a voting thread once the FLIP document has been
> >>> updated if there are no other concerns?
> >>>
> >>> Thanks,
> >>> Timo
> >>>
> >>>
> >>> On 01.02.21 15:07, Leonard Xu wrote:
> >>>> Hi, all
> >>>>
> >>>> I’ve discussed with @Timo @Jark about the time function evaluation
> >>> further. We reach a consensus that we’d better address the time
> function
> >>> evaluation(function value materialization) in this FLIP as well.
> >>>>
> >>>> We’re fine with introducing an option
> >>> table.exec.time-function-evaluation to control the materialize time
> point
> >>> of time function value. The time function includes
> >>>> LOCALTIME
> >>>> LOCALTIMESTAMP
> >>>> CURRENT_DATE
> >>>> CURRENT_TIME
> >>>> CURRENT_TIMESTAMP
> >>>> NOW()
> >>>> The default value of table.exec.time-function-evaluation is
> >>> 'per-record', which means Flink evaluates the function value per
> record,
> >> we
> >>> recommend users config this option value for their streaming pipe
> lines.
> >>>> Another valid option value is ’query-start’, which means Flink
> >> evaluates
> >>> the function value at the query start, we recommend users config this
> >>> option value for their batch pipelines.
> >>>> In the future, more valid evaluation option value like ‘auto' may be
> >>> supported if there’re new requirements, e.g: support ‘auto’ option
> which
> >>> evaluates time function value per-record in streaming mode and
> evaluates
> >>>> time function value at query start in batch mode.
> >>>>
> >>>> Alternative1:
> >>>>    Introduce function like
> CURRENT_TIMESTAMP2/CURRENT_TIMESTAMP_NOW
> >>> which evaluates function value at query start. This may confuse users a
> >> bit
> >>> that we provide two similar functions but with different return value.
> >>>
> >>>>
> >>>> Alternative2:
> >>>>  Do not introduce any configuration/function, control the
> >>> function evaluation by pipeline execution mode. This may produce
> >> different
> >>> result when user use their  streaming pipeline sql to run a batch
> >>> pipeline(e.g backfilling), and user also
> >>>> can not control these function behavior.
> >>>>
> >>>>
> >>>> How do you think ?
> >>>>
> >>>> Thanks,
> >>>> Leonard
> >>>>
> >>>>
> >>>>> 在 2021年2月1日,18:23,Timo Walther  写道:
> >>>>>
> >>>>> Parts of the FLIP can already be implemented without a completed
> >>> voting, e.g. there is no doubt that we should support TIME(9).
> >>>>>
> >>>>> However, I don't see a benefit of reworking the time functions to
> >>> rework them again later. If we lock the time on query-start the
> >>> implementation of the previsouly mentioned functions will be completely
> >>> different.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 01.02.21 02:37, Kurt Young wrote:
> >>>>>> I also prefer to not expand this FLIP further, but we could open a
> >>>>>> discussion thread
> >>>>>> right after this FLIP being accepted and start coding & reviewing.
> >> Make
> >>>>>> technique
> >>>>>> discussion and coding more pipelined will improve efficiency.
> >>>>>> Best,
> >>>>>> Kurt
> >>>>>> On Sat, Jan 30, 2021 at 3:47 PM Leonard Xu 
> >> wrote:
> >>>>>>> Hi, Timo
> >>>>>>>
> >>>>>>>> I do think that this topic must be part of the FLIP as well. Esp.
> >> if
> >>> the
> >>>>>>> FLIP has the title "time fun

[jira] [Created] (FLINK-21236) Don't explicitly use HeapMemorySegment in row format serde

2021-02-01 Thread Kurt Young (Jira)
Kurt Young created FLINK-21236:
--

 Summary: Don't explicitly use HeapMemorySegment in row format serde
 Key: FLINK-21236
 URL: https://issues.apache.org/jira/browse/FLINK-21236
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Kurt Young
 Fix For: 1.13.0


`RawFormatDeserializationSchema` and `RawFormatSerializationSchema` explicitly 
used `HeapMemorySegment`, and in a typical batch job, `HybridMemorySegment` 
will also be loaded and used as managed memory. This will prevent Class 
Hierarchy Analysis (CHA) to optimize the function call of MemorySegment. More 
details can be found here: 
[https://flink.apache.org/news/2015/09/16/off-heap-memory.html]

We can use `ByteBuffer` instead of `HeapMemorySegment`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-01-31 Thread Kurt Young
)
> >>>>>>> 3) I'm fine with supporting all conversion classes like
> >>>>>> java.time.LocalDateTime, java.sql.Timestamp that TimestampType
> supported
> >>>>>> for LocalZonedTimestampType. But we agree that Instant stays the
> default
> >>>>>> conversion class right? The default extraction defined in [2] will
> not
> >>>>>> change, correct?
> >>>>>> Yes, Instant stays the default conversion class. The default
> >>>>>>
> >>>>>>> 4) I would remove the comment "Flink supports TIME-related types
> with
> >>>>>> precision well", because unfortunately this is still not correct.
> We still
> >>>>>> have issues with TIME(9), it would be great if someone can finally
> fix that
> >>>>>> though. Maybe the implementation of this FLIP would be a good time
> to fix
> >>>>>> this issue.
> >>>>>> You’re right, TIME(9) is not supported yet, I'll take account of
> TIME(9)
> >>>>>> to the scope of this FLIP.
> >>>>>>
> >>>>>>
> >>>>>> I’ve updated this FLIP[2] according your suggestions @Jark @Timo
> >>>>>> I’ll start the vote soon if there’re no objections.
> >>>>>>
> >>>>>> Best,
> >>>>>> Leonard
> >>>>>>
> >>>>>> [1]
> >>>>>>
> https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing
> >>>>>> <
> >>>>>>
> https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing
> <
> https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing
> >
> >>>>>>>
> >>>>>> [2]
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior
> >
> >>>>>> <
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162:+Consistent+Flink+SQL+time+function+behavior
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162:+Consistent+Flink+SQL+time+function+behavior
> >>
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> On 28.01.21 03:18, Jark Wu wrote:
> >>>>>>>> Thanks Leonard for the further investigation.
> >>>>>>>> I think we all agree we should correct the return value of
> >>>>>>>> CURRENT_TIMESTAMP.
> >>>>>>>> Regarding the return type of CURRENT_TIMESTAMP, I also agree
> >>>>>> TIMESTAMP_LTZ
> >>>>>>>> would be more worldwide useful. This may need more effort, but if
> this
> >>>>>> is
> >>>>>>>> the right direction, we should do it.
> >>>>>>>> Regarding the CURRENT_TIME, if CURRENT_TIMESTAMP returns
> >>>>>>>> TIMESTAMP_LTZ, then I think CURRENT_TIME shouldn't return TIME_TZ.
> >>>>>>>> Otherwise, CURRENT_TIME will be quite special and strange.
> >>>>>>>> Thus I think it has to return TIME type. Given that we already
> have
> >>>>>>>> CURRENT_DATE which returns
> >>>>>>>> DATE WITHOUT TIME ZONE, I think it's fine to return TIME WITHOUT
> TIME
> >>>>>> ZONE
> >>>>>>>> for CURRENT_TIME.
> >>>>>>>> In a word, the updated FLIP looks good to me. I especially like
> the
> >>>>>>>> proposed new function TO_TIMESTAMP_LTZ(numeric, [,scale]).
> >>>>>>>> This will be very convenient to define rowtime on a long value
> which is
> >>>>>> a
> >>>>>>>> very common case and has been complained a lot in mailing list.
> >>>>>>>> Best,
> >>>>>>>> Jark
> >>>>>>>> On Mon, 25 Jan 2021 at 21:12, Kurt Young 
> wrote:
> >>>>>>>>> Thanks Leonard for the detailed response and also the bad case
> about
> >>>>>> option
> >>>>>>>>> 1, thes

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-01-25 Thread Kurt Young
ons
> for this:
> (1) We can forbid CURRENT_TIME as @Timo proposed to make all Flink SQL
> functions follow the standard well,  in this way, we need to offer some
> guidance for user upgrading Flink versions.
> (2) We can also support it from a user's perspective who has used
> CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, btw,Snowflake also returns
> TIME type.
> (3) Returns TIMESTAMP WITH LOCAL TIME ZONE to make it equal to
> CURRENT_TIMESTAMP as Calcite did.
>
> I can image (1) which we don't want to left a bad smell in Flink SQL,  and
> I also accept (2) because I think users do not consider time zone issues
> when they use CURRENT_DATE/CURRENT_TIME, and the timezone info in time is
> not very useful.
>
> I don’t have a strong opinion  for them.  What do others think?
>
>
> I hope I've addressed your concerns. @Timo @Kurt
>
> Best,
> Leonard
>
>
>
> > Most of the mature systems have a clear difference between
> CURRENT_TIMESTAMP and LOCALTIMESTAMP. I wouldn't take Spark or Hive as a
> good example. Snowflake decided for TIMESTAMP WITH LOCAL TIME ZONE. As I
> mentioned in the last comment, I could also imagine this behavior for
> Flink. But in any case, there should be some time zone information
> considered in order to cast to all other types.
> >
> > >>> The function CURRENT_DATE/CURRENT_TIME is supporting in SQL
> standard, but
> > >>> LOCALDATE not, I don’t think it’s a good idea that dropping
> functions which
> > >>> SQL standard supported and introducing a replacement which SQL
> standard not
> > >>> reminded.
> >
> > We can still add those functions in the future. But since we don't offer
> a TIME WITH TIME ZONE, it is better to not support this function at all for
> now. And by the way, this is exactly the behavior that also Microsoft SQL
> Server does: it also just supports CURRENT_TIMESTAMP (but it returns
> TIMESTAMP without a zone which completes the confusion).
> >
> > >>> I also agree returning  TIMESTAMP WITH LOCAL TIME ZONE for PROCTIME
> has
> > >>> more clear semantics, but I realized that user didn’t care the type
> but
> > >>> more about the expressed value they saw, and change the type from
> TIMESTAMP
> > >>> to TIMESTAMP WITH LOCAL TIME ZONE brings huge refactor that we need
> > >>> consider all places where the TIMESTAMP type used
> >
> > From a UDF perspective, I think nothing will change. The new type system
> and type inference were designed to support all these cases. There is a
> reason why Java has adopted Joda time, because it is hard to come up with a
> good time library. That's why also we and the other Hadoop ecosystem folks
> have decided for 3 different kinds of LocalDateTime, ZonedDateTime, and
> Instance. It makes the library more complex, but time is a complex topic.
> >
> > I also doubt that many users work with only one time zone. Take the US
> as an example, a country with 3 different timezones. Somebody working with
> US data cannot properly see the data points with just LOCAL TIME ZONE. But
> on the other hand, a lot of event data is stored using a UTC timestamp.
> >
> >
> > >> Before jumping into technique details, let's take a step back to
> discuss
> > >> user experience.
> > >>
> > >> The first important question is what kind of date and time will Flink
> > >> display when users call
> > >>   CURRENT_TIMESTAMP and maybe also PROCTIME (if we think they are
> similar).
> > >>
> > >> Should it always display the date and time in UTC or in the user's
> time
> > >> zone?
> >
> > @Kurt: I think we all agree that the current behavior with just showing
> UTC is wrong. Also, we all agree that when calling CURRENT_TIMESTAMP or
> PROCTIME a user would like to see the time in it's current time zone.
> >
> > As you said, "my wall clock time".
> >
> > However, the question is what is the data type of what you "see". If you
> pass this record on to a different system, operator, or different cluster,
> should the "my" get lost or materialized into the record?
> >
> > TIMESTAMP -> completely lost and could cause confusion in a different
> system
> >
> > TIMESTAMP WITH LOCAL TIME ZONE -> at least the UTC is correct, so you
> can provide a new local time zone
> >
> > TIMESTAMP WITH TIME ZONE -> also "your" location is persisted
> >
> > Regards,
> > Timo
> >
> >
> >
> >
> > On 22.01.21 09:38, Kurt Young wrote:
> >> Forgot on

Re: about flink calcite implement

2021-01-25 Thread Kurt Young
Flink will convert all projections and filters to calc before converting to
logical nodes.
You can check out FlinkStreamRuleSets.LOGICAL_RULES

Best,
Kurt


On Mon, Jan 25, 2021 at 11:03 AM laughing.sh...@qq.com <
laughing.sh...@qq.com> wrote:

>
> In flink table planner module, I can not find the rule that convert
> calcite Filter or LogicalFilter into flink physicalNode. how flink
> implement the filter operator. Is there any class like DataStreamScan that
> implement the TableScan.
>
>
> laughing.sh...@qq.com
>


Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-01-22 Thread Kurt Young
Forgot one more thing. Continue with displaying in UTC. As a user, if Flink
want to display the timestamp
in UTC, why don't we offer something like UTC_TIMESTAMP?

Best,
Kurt


On Fri, Jan 22, 2021 at 4:33 PM Kurt Young  wrote:

> Before jumping into technique details, let's take a step back to discuss
> user experience.
>
> The first important question is what kind of date and time will Flink
> display when users call
>  CURRENT_TIMESTAMP and maybe also PROCTIME (if we think they are similar).
>
> Should it always display the date and time in UTC or in the user's time
> zone? I think this part is the
> reason that surprised lots of users. If we forget about the type and
> internal representation of these
> two methods, as a user, my instinct tells me that these two methods should
> display my wall clock time.
>
> Display time in UTC? I'm not sure, why I should care about UTC time? I
> want to get my current timestamp.
> For those users who have never gone abroad, they might not even be able to
> realize that this is affected
> by the time zone.
>
> Best,
> Kurt
>
>
> On Fri, Jan 22, 2021 at 12:25 PM Leonard Xu  wrote:
>
>> Thanks @Timo for the detailed reply, let's go on this topic on this
>> discussion,  I've merged all mails to this discussion.
>>
>> > LOCALDATE / LOCALTIME / LOCALTIMESTAMP
>> >
>> > --> uses session time zone, returns DATE/TIME/TIMESTAMP
>>
>> >
>> > CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP
>> >
>> > --> uses session time zone, returns DATE/TIME/TIMESTAMP
>> >
>> > I'm very sceptical about this behavior. Almost all mature systems
>> (Oracle, Postgres) and new high quality systems (Presto, Snowflake) use a
>> data type with some degree of time zone information encoded. In a
>> globalized world with businesses spanning different regions, I think we
>> should do this as well. There should be a difference between
>> CURRENT_TIMESTAMP and LOCALTIMESTAMP. And users should be able to choose
>> which behavior they prefer for their pipeline.
>>
>>
>> I know that the two series should be different at first glance, but
>> different SQL engines can have their own explanations,for example,
>> CURRENT_TIMESTAMP and LOCALTIMESTAMP are synonyms in Snowflake[1] and has
>> no difference, and Spark only supports the later one and doesn’t support
>> LOCALTIME/LOCALTIMESTAMP[2].
>>
>>
>> > If we would design this from scatch, I would suggest the following:
>> >
>> > - drop CURRENT_DATE / CURRENT_TIME and let users pick LOCALDATE /
>> LOCALTIME for materialized timestamp parts
>>
>> The function CURRENT_DATE/CURRENT_TIME is supporting in SQL standard, but
>> LOCALDATE not, I don’t think it’s a good idea that dropping functions which
>> SQL standard supported and introducing a replacement which SQL standard not
>> reminded.
>>
>>
>> > - CURRENT_TIMESTAMP should return a TIMESTAMP WITH TIME ZONE to
>> materialize all session time information into every record. It it the most
>> generic data type and allows to cast to all other timestamp data types.
>> This generic ability can be used for filter predicates as well either
>> through implicit or explicit casting.
>>
>> TIMESTAMP WITH TIME ZONE indeed contains more information to describe a
>> time point, but the type TIMESTAMP  can cast to all other timestamp data
>> types combining with session time zone as well, and it also can be used for
>> filter predicates. For type casting between BIGINT and TIMESTAMP, I think
>> the function way using TO_TIMEMTAMP()/FROM_UNIXTIMESTAMP() is more clear.
>>
>> > PROCTIME/ROWTIME should be time functions based on a long value. Both
>> System.currentMillis() and our watermark system work on long values. Those
>> should return TIMESTAMP WITH LOCAL TIME ZONE because the main calculation
>> should always happen based on UTC.
>> > We discussed it in a different thread, but we should allow PROCTIME
>> globally. People need a way to create instances of TIMESTAMP WITH LOCAL
>> TIME ZONE. This is not considered in the current design doc.
>> > Many pipelines contain UTC timestamps and thus it should be easy to
>> create one.
>> > Also, both CURRENT_TIMESTAMP and LOCALTIMESTAMP can work with this type
>> because we should remember that TIMESTAMP WITH LOCAL TIME ZONE accepts all
>> timestamp data types as casting target [1]. We could allow TIMESTAMP WITH
>> TIME ZONE in the future for ROWTIME.
>> > In any case, windows should simply adapt their behavior to the passed
>> timestamp type. And with TIMESTAMP WIT

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-01-22 Thread Kurt Young
;> Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
> CURRENT_TIME;
> >>
> +-+-+-+--+--+
> >> |  EXPR$0 |  EXPR$1 |
>  CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |
> >>
> +-+-+-+--+--+
> >> | 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
> 2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |
> >>
> +-+-+-+--+--+
> >> After the changes, the expected behavior will change to:
> >>
> >> Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
> CURRENT_TIME;
> >>
> +-+-+-+--+--+
> >> |  EXPR$0 |  EXPR$1 |
>  CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |
> >>
> +-+-+-+--+--+
> >> | 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
> 2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |
> >>
> +-+-+-+--+--+
> >> The return type of now(), proctime() and CURRENT_TIMESTAMP still be
> TIMESTAMP;
> >
> > To Kurt, thanks  for the intuitive case, it really clear, you’re wright
> that I want to propose to change the return value of these functions. It’s
> the most important part of the topic from user's perspective.
> >
> >> I think this definitely deserves a FLIP.
> > To Jark,  nice suggestion, I prepared a FLIP for this topic, and will
> start the FLIP discussion soon.
> >
> >>> If use the default Flink SQL, the window time range of the
> >>> statistics is incorrect, then the statistical results will naturally be
> >>> incorrect.
> > To zhisheng, sorry to hear that this problem influenced your production
> jobs,  Could you share your SQL pattern?  we can have more inputs and try
> to resolve them.
> >
> >
> > Best,
> > Leonard
>
>
>
> >  2021-01-21,14:19,Jark Wu  :
> >
> > Great examples to understand the problem and the proposed changes, @Kurt!
> >
> > Thanks Leonard for investigating this problem.
> > The time-zone problems around time functions and windows have bothered a
> > lot of users. It's time to fix them!
> >
> > The return value changes sound reasonable to me, and keeping the return
> > type unchanged will minimize the surprise to the users.
> > Besides that, I think it would be better to mention how this affects the
> > window behaviors, and the interoperability with DataStream.
> >
> > I think this definitely deserves a FLIP.
> >
> > 
> >
> > Hi zhisheng,
> >
> > Do you have examples to illustrate which case will get the wrong window
> > boundaries?
> > That will help to verify whether the proposed changes can solve your
> > problem.
> >
> > Best,
> > Jark
>
>
>
>
> > 2021-01-21,12:54,zhisheng <173855...@qq.com> :
> >
> > Thanks to Leonard Xu for discussing this tricky topic. At present, there
> are many Flink jobs in our production environment that are used to count
> day-level reports (eg: count PV/UV ).
> >
> > If use the default Flink SQL, the window time range of the
> statistics is incorrect, then the statistical results will naturally be
> incorrect.
> >
> > The user needs to deal with the time zone manually in order to solve the
> problem.
> >
> > If Flink itself can solve these time zone issues, then I think it will
> be user-friendly.
> >
> > Thank you
> >
> > Best!;
> > zhisheng
>
>
>
>
> >  2021-01-21,12:11,Kurt Young  :
> >
> > cc this to user & user-zh mailing list because this will affect lots of
> users, and also quite a lot of users
> > were asking questions around this topic.
> >
> > Let me try to understand this from user's perspective.
> >
> > Your proposal will affect five functions, which are:
> > PROCTIME()
> > NOW()
> > CURRENT_DATE
> > CURRENT_TIME
> > CURRENT_TIMESTAMP
> >

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Kurt Young
cc this to user & user-zh mailing list because this will affect lots of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's perspective.

Your proposal will affect five functions, which are:

   - PROCTIME()
   - NOW()
   - CURRENT_DATE
   - CURRENT_TIME
   - CURRENT_TIMESTAMP

Before the changes, as I am writing this reply, the local time here is
*2021-01-21
12:03:35 (Beijing time, UTC+8)*.
And I tried these 5 functions in sql client, and got:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |*

*+-+-+-+--+--+*
After the changes, the expected behavior will change to:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |*

*+-+-+-+--+--+*
The return type of now(), proctime() and CURRENT_TIMESTAMP still be
TIMESTAMP;

Best,
Kurt


On Tue, Jan 19, 2021 at 6:42 PM Leonard Xu  wrote:

> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  写道:
> >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME() has a
> timezone offset with the wall-clock time in users' local time zone, users
> need to add their local time zone offset manually to get expected local
> timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get
> wall-clock timestamp in local time zone, and thus they need write UDF in
> their SQL just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day interval based on
> PROCTIME(), user plan to put all data from one day into the same window,
> but the window is assigned using timestamp in UTC+0 timezone rather than
> the session timezone which leads to the window starts with an offset(e.g:
> Users in China need to add -8h in their business sql start and then +8h
> when output the result, the conversion like a magic for users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I
> made an excel [2] to organize them well, we can use it for the next
> discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP 

[ANNOUNCE] Welcome Guowei Ma as a new Apache Flink Committer

2021-01-19 Thread Kurt Young
Hi everyone,

I'm very happy to announce that Guowei Ma has accepted the invitation to
become a Flink committer.

Guowei is a very long term Flink developer, he has been extremely helpful
with
some important runtime changes, and also been  active with answering user
questions as well as discussing designs.

Please join me in congratulating Guowei for becoming a Flink committer!

Best,
Kurt


Re: [DISCUSS] Backport broadcast operations in BATCH mode to Flink

2021-01-13 Thread Kurt Young
+1

Best,
Kurt


On Thu, Jan 14, 2021 at 12:25 AM Seth Wiesman  wrote:

> +1
>
> I would hope this helps attract more early adopters so if there are issues
> we can resolve them in time for 1.13.
>
> Seth
>
> On Wed, Jan 13, 2021 at 5:13 AM Dawid Wysakowicz 
> wrote:
>
> > Hi,
> >
> > Given that the BATCH execution mode was only released in 1.12 and a
> > rather small impact of the suggested change I'd be ok with backporting
> > it to 1.12.x.
> >
> > Best,
> >
> > Dawid
> >
> > On 07/01/2021 12:50, Kostas Kloudas wrote:
> > > +1 on my side as it does not break anything and it can act as
> motivation
> > > for some people to upgrade.
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Thu, 7 Jan 2021, 12:39 Aljoscha Krettek, 
> wrote:
> > >
> > >> 1.12.x
> > >> Reply-To:
> > >>
> > >> Hi,
> > >>
> > >> what do you think about backporting FLINK-20491 [1] to Flink 1.12.x?
> > >>
> > >> I (we, including Dawid and Kostas) are a bit torn on this.
> > >>
> > >> a) It's a limitation of Flink 1.12.0 and fixing this seems very good
> for
> > >> users that would otherwise have to wait until Flink 1.13.0.
> > >>
> > >> b) It's technically a new feature. We allow something with this change
> > >> where previously an `UnsupportedOperationException` would be thrown.
> > >>
> > >> I would lean towards backporting this to 1.12.x. Thoughts?
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> [1] https://issues.apache.org/jira/browse/FLINK-20491
> > >>
> > >>
> > >>
> >
> >
>


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Kurt Young
 Sorry for the typo -_-!
I meant idea #2.

Best,
Kurt


On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu  wrote:

> Hi Kurt,
>
> Thx a lot for your feedback. If local aggregation is more like a physical
> operator rather than logical
> operator, I think your suggestion should be idea #2 which handle all in
> the physical optimization phase?
>
> Looking forward for the further discussion.
>
>
> Kurt Young  于2021年1月5日周二 上午9:52写道:
>
>> Local aggregation is more like a physical operator rather than logical
>> operator. I would suggest going with idea #1.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
>> wrote:
>>
>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>> > For (1): Agree: Since we are in the period of upgrading the new table
>> > source api,
>> > we really should consider the new interface for the new optimize rule.
>> If
>> > the new rule
>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>> have
>> > change to use
>> > the ability interface for the SupportsAggregatePushDown definition in
>> above
>> > proposal.
>> >
>> > For (2): Agree: Change to use CallExpression is a better choice, and
>> have
>> > resolved this
>> > comment in the proposal.
>> >
>> > For (3): I suggest we first support the JDBC connector, as we don't have
>> > Druid connector
>> > and ES connector just has sink api at present.
>> >
>> > But perhaps the biggest question may be whether we should use idea 1 or
>> > idea 2 in proposal.
>> >
>> > What do you think?  After we reach the agreement on the proposal, our
>> team
>> > can drive to
>> > complete this feature.
>> >
>> > Jark Wu  于2020年12月29日周二 下午2:58写道:
>> >
>> > > Hi Sebastian,
>> > >
>> > > Thanks for the proposal. I think this is a great improvement for Flink
>> > SQL.
>> > > I went through the design doc and have following thoughts:
>> > >
>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a
>> new
>> > >  set of DynamicTableSource interfaces. Could you update your proposal
>> to
>> > > use the new interfaces?
>> > >  Follow the existing ability interfaces, e.g.
>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>> > >
>> > > 2) Personally, I think CallExpression would be a better representation
>> > than
>> > > separate `FunctionDefinition` and args. Because, it would be easier to
>> > know
>> > > what's the index and type of the arguments.
>> > >
>> > > 3) It would be better to list which connectors will be supported in
>> the
>> > > plan?
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > >
>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
>> > wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'd like to discuss a new feature for the Blink Planner.
>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>> layer.
>> > > > With the developing of storage, many downstream storage of Flink SQL
>> > has
>> > > > the ability to deal with Aggregation operator.
>> > > > Pushing down Aggregate to data source layer will improve performance
>> > from
>> > > > the perspective of the network IO and computation overhead.
>> > > >
>> > > > I have drafted a design doc for this new feature.
>> > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>> > > >
>> > > > Any comment or discussion is welcome.
>> > > >
>> > > > --
>> > > >
>> > > > *With kind regards
>> > > > 
>> > > > Sebastian Liu 刘洋
>> > > > Institute of Computing Technology, Chinese Academy of Science
>> > > > Mobile\WeChat: +86—15201613655
>> > > > E-mail: liuyang0...@gmail.com 
>> > > > QQ: 3239559*
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > *With kind regards
>> > 
>> > Sebastian Liu 刘洋
>> > Institute of Computing Technology, Chinese Academy of Science
>> > Mobile\WeChat: +86—15201613655
>> > E-mail: liuyang0...@gmail.com 
>> > QQ: 3239559*
>> >
>>
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>
>


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Kurt Young
Local aggregation is more like a physical operator rather than logical
operator. I would suggest going with idea #1.

Best,
Kurt


On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu  wrote:

> Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> For (1): Agree: Since we are in the period of upgrading the new table
> source api,
> we really should consider the new interface for the new optimize rule. If
> the new rule
> doesn't use the new api, we'll have to upgrade it sooner or later. I have
> change to use
> the ability interface for the SupportsAggregatePushDown definition in above
> proposal.
>
> For (2): Agree: Change to use CallExpression is a better choice, and have
> resolved this
> comment in the proposal.
>
> For (3): I suggest we first support the JDBC connector, as we don't have
> Druid connector
> and ES connector just has sink api at present.
>
> But perhaps the biggest question may be whether we should use idea 1 or
> idea 2 in proposal.
>
> What do you think?  After we reach the agreement on the proposal, our team
> can drive to
> complete this feature.
>
> Jark Wu  于2020年12月29日周二 下午2:58写道:
>
> > Hi Sebastian,
> >
> > Thanks for the proposal. I think this is a great improvement for Flink
> SQL.
> > I went through the design doc and have following thoughts:
> >
> > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a new
> >  set of DynamicTableSource interfaces. Could you update your proposal to
> > use the new interfaces?
> >  Follow the existing ability interfaces, e.g.
> > SupportsFilterPushDown, SupportsProjectionPushDown.
> >
> > 2) Personally, I think CallExpression would be a better representation
> than
> > separate `FunctionDefinition` and args. Because, it would be easier to
> know
> > what's the index and type of the arguments.
> >
> > 3) It would be better to list which connectors will be supported in the
> > plan?
> >
> > Best,
> > Jark
> >
> >
> > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to discuss a new feature for the Blink Planner.
> > > Aggregate operator of Flink SQL is currently fully done at Flink layer.
> > > With the developing of storage, many downstream storage of Flink SQL
> has
> > > the ability to deal with Aggregation operator.
> > > Pushing down Aggregate to data source layer will improve performance
> from
> > > the perspective of the network IO and computation overhead.
> > >
> > > I have drafted a design doc for this new feature.
> > >
> > >
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> > >
> > > Any comment or discussion is welcome.
> > >
> > > --
> > >
> > > *With kind regards
> > > 
> > > Sebastian Liu 刘洋
> > > Institute of Computing Technology, Chinese Academy of Science
> > > Mobile\WeChat: +86—15201613655
> > > E-mail: liuyang0...@gmail.com 
> > > QQ: 3239559*
> > >
> >
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>


Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2020-12-06 Thread Kurt Young
Thanks Rui for starting this discussion.

I can see the benefit that we improve hive compatibility further, as quite
some users are asking for this
feature in mailing lists [1][2][3] and some online chatting tools such as
DingTalk.

I have 3 comments regarding to the design doc:

a) Could you add a section to describe the typical use case you want to
support after this feature is introduced?
In that way, users can also have an impression how to use this feature and
what the behavior and outcome will be.

b) Regarding the naming: "BlinkParserFactory", I suggest renaming it to
"FlinkParserFactory".

c) About the two limitations you mentioned:
1. Only works with Hive tables and the current catalog needs to be a
HiveCatalog.
2. Queries cannot involve tables/views from multiple catalogs.
I assume this is because hive parser and analyzer doesn't support
referring to a name with "x.y.z" fashion? Since
we can control all the behaviors by leveraging the codes hive currently
use. Is it possible that we can remove such
limitations? The reason is I'm not sure if users can make the whole story
work purely depending on hive catalog (that's
the reason why I gave comment #a). If multiple catalogs are involved, with
this limitation I don't think any meaningful
pipeline could be built. For example, users want to stream data from Kafka
to Hive, fully use hive's dialect including
query part. The kafka table could be a temporary table or saved in default
memory catalog.


[1] http://apache-flink.147419.n8.nabble.com/calcite-td9059.html#a9118
[2] http://apache-flink.147419.n8.nabble.com/hive-sql-flink-11-td9116.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-to-in-Flink-to-support-below-HIVE-SQL-td34162.html

Best,
Kurt


On Wed, Dec 2, 2020 at 10:02 PM Rui Li  wrote:

> Hi guys,
>
> I'd like to start a discussion about providing HiveQL compatibility for
> users connecting to a hive warehouse. FLIP-123 has already covered most
> DDLs. So now it's time to complement the other big missing part -- queries.
> With FLIP-152, the hive dialect covers more scenarios and makes it even
> easier for users to migrate to Flink. More details are in the FLIP wiki
> page [1]. Looking forward to your feedback!
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility
>
> --
> Best regards!
> Rui Li
>


Re: Hive Streaming write compaction

2020-11-18 Thread Kurt Young
We just added this feature to 1.12 [1][2], it would be great that you can
download the 1.12 RC to test
it out, and give us some feedback.

In case you will wonder why I linked 2 jiras, it's because both FileSystem
& Hive connector share
the same option options and also the implementations.

[1] https://issues.apache.org/jira/browse/FLINK-19875
[2] https://issues.apache.org/jira/browse/FLINK-19886

Best,
Kurt


On Thu, Nov 19, 2020 at 2:31 PM Chen Qin  wrote:

> Hi there,
>
> We are testing out writing Kafka to hive table as parquet format.
> Currently, we have seen user has to choose to create lots of small files in
> min level folder to gain latency benefits. I recall FF2020 Global folks
> mentioned implement compaction logic during the checkpointing time. Wonder
> how that goes? Love collaborate on this topic.
>
> Chen
> Pinterest
>


Re: [VOTE] NEW FLIP-104: Add More Metrics to JobManager

2020-10-28 Thread Kurt Young
+1

Best,
Kurt


On Wed, Oct 28, 2020 at 2:44 PM Robert Metzger  wrote:

> Thank you for your UI work Yadong!
>
> +1
>
> On Tue, Oct 27, 2020 at 6:33 PM Matthias Pohl 
> wrote:
>
> > Thanks for restarting the vote, Yadong. I really like your UI proposals.
> > +1 for adding the changes of FLIP-104.
> >
> > Matthias
> >
> > On Tue, Oct 27, 2020 at 10:29 AM Xintong Song 
> > wrote:
> >
> > > Thanks for reviving this FLIP, Yandong.
> > >
> > > The proposed changes look good to me.
> > > +1 for accepting this FLIP.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Oct 27, 2020 at 4:02 PM Yadong Xie 
> wrote:
> > >
> > > > Hi all
> > > >
> > > > I want to start a new vote for FLIP-104, which proposes to add more
> > > metrics
> > > > to the job manager in web UI.
> > > >
> > > > The new FLIP-104 was revisited and adapted following the old ML
> > > discussion
> > > > <
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-104-Add-More-Metrics-to-Jobmanager-td37901.html
> > > > >
> > > > .
> > > >
> > > > The vote will last for at least 72 hours, following the consensus
> > voting.
> > > >
> > > >
> > > > FLIP-104 wiki:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-104%3A+Add+More+Metrics+to+Jobmanager
> > > >
> > > >
> > > > Discussion thread:
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-104-Add-More-Metrics-to-Jobmanager-td37901.html
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-75-Flink-Web-UI-Improvement-Proposal-td33540.html
> > > >
> > > > Thanks,
> > > >
> > > > Yadong
> > > >
> > >
> >
>


Re: [VOTE] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-25 Thread Kurt Young
+1 (binding)

Best,
Kurt


On Mon, Oct 26, 2020 at 11:19 AM Yingjie Cao 
wrote:

> Hi devs,
>
> I'd like to start a vote for FLIP-148: Introduce Sort-Merge Based Blocking
> Shuffle to Flink [1] which is discussed in discussion thread [2].
>
> The vote will last for at least 72 hours until a consensus voting.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> [2]
>
> https://lists.apache.org/thread.html/r11750db945277d944f408eaebbbdc9d595d587fcfb67b015c716404e%40%3Cdev.flink.apache.org%3E
>


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-23 Thread Kurt Young
gt;>>>> [2]:
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> > >>>>> [3]:
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
> > >>>>> [4]:
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
> > >>>>> [5]:
> > >>>> https://impala.apache.org/docs/build/html/topics/impala_upsert.html
> > >>>>> [6]:
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
> > >>>>> [7]: https://phoenix.apache.org/atomic_upsert.html
> > >>>>> [8]:
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html
> > >>>>>
> > >>>>> On Fri, 23 Oct 2020 at 10:36, Jingsong Li 
> > >>>> wrote:
> > >>>>>
> > >>>>>> The `kafka-cdc` looks good to me.
> > >>>>>> We can even give options to indicate whether to turn on compact,
> > >>>> because
> > >>>>>> compact is just an optimization?
> > >>>>>>
> > >>>>>> - ktable let me think about KSQL.
> > >>>>>> - kafka-compacted it is not just compacted, more than that, it
> still
> > >>>> has
> > >>>>>> the ability of CDC
> > >>>>>> - upsert-kafka , upsert is back, and I don't really want to see it
> > >>>> again
> > >>>>>> since we have CDC
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Jingsong
> > >>>>>>
> > >>>>>> On Fri, Oct 23, 2020 at 2:21 AM Timo Walther 
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Jark,
> > >>>>>>>
> > >>>>>>> I would be fine with `connector=upsert-kafka`. Another idea would
> > >>>> be to
> > >>>>>>> align the name to other available Flink connectors [1]:
> > >>>>>>>
> > >>>>>>> `connector=kafka-cdc`.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>> [1] https://github.com/ververica/flink-cdc-connectors
> > >>>>>>>
> > >>>>>>> On 22.10.20 17:17, Jark Wu wrote:
> > >>>>>>>> Another name is "connector=upsert-kafka', I think this can solve
> > >>>>> Timo's
> > >>>>>>>> concern on the "compacted" word.
> > >>>>>>>>
> > >>>>>>>> Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify
> > >>>> such
> > >>>>>>> kafka
> > >>>>>>>> sources.
> > >>>>>>>> I think "upsert" is a well-known terminology widely used in many
> > >>>>>> systems
> > >>>>>>>> and matches the
> > >>>>>>>>behavior of how we handle the kafka messages.
> > >>>>>>>>
> > >>>>>>>> What do you think?
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Jark
> > >>>>>>>>
> > >>>>>>>> [1]:
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://mate

Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Kurt Young
e boundedness
> > > >> model = table/stream   -- Some information about interpretation
> > > >> )
> > > >>
> > > >>
> > > >> We can still apply all the constraints mentioned in the FLIP. When
> > > >> `model` is set to `table`.
> > > >>
> > > >> What do you think?
> > > >>
> > > >> Regards,
> > > >> Timo
> > > >>
> > > >>
> > > >> On 21.10.20 14:19, Jark Wu wrote:
> > > >>> Hi,
> > > >>>
> > > >>> IMO, if we are going to mix them in one connector,
> > > >>> 1) either users need to set some options to a specific value
> > > explicitly,
> > > >>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
> > > >>> This makes the connector awkward to use. Users may face to fix
> > options
> > > >> one
> > > >>> by one according to the exception.
> > > >>> Besides, in the future, it is still possible to use
> > > >>> "sink.partitioner=fixed" (reduce network cost) if users are aware
> of
> > > >>> the partition routing,
> > > >>> however, it's error-prone to have "fixed" as default for compacted
> > > mode.
> > > >>>
> > > >>> 2) or make those options a different default value when
> > > "compacted=true".
> > > >>> This would be more confusing and unpredictable if the default value
> > of
> > > >>> options will change according to other options.
> > > >>> What happens if we have a third mode in the future?
> > > >>>
> > > >>> In terms of usage and options, it's very different from the
> > > >>> original "kafka" connector.
> > > >>> It would be more handy to use and less fallible if separating them
> > into
> > > >> two
> > > >>> connectors.
> > > >>> In the implementation layer, we can reuse code as much as possible.
> > > >>>
> > > >>> Therefore, I'm still +1 to have a new connector.
> > > >>> The "kafka-compacted" name sounds good to me.
> > > >>>
> > > >>> Best,
> > > >>> Jark
> > > >>>
> > > >>>
> > > >>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf 
> > > >> wrote:
> > > >>>
> > > >>>> Hi Kurt, Hi Shengkai,
> > > >>>>
> > > >>>> thanks for answering my questions and the additional
> > clarifications. I
> > > >>>> don't have a strong opinion on whether to extend the "kafka"
> > connector
> > > >> or
> > > >>>> to introduce a new connector. So, from my perspective feel free to
> > go
> > > >> with
> > > >>>> a separate connector. If we do introduce a new connector I
> wouldn't
> > > >> call it
> > > >>>> "ktable" for aforementioned reasons (In addition, we might suggest
> > > that
> > > >>>> there is also a "kstreams" connector for symmetry reasons). I
> don't
> > > >> have a
> > > >>>> good alternative name, though, maybe "kafka-compacted" or
> > > >>>> "compacted-kafka".
> > > >>>>
> > > >>>> Thanks,
> > > >>>>
> > > >>>> Konstantin
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young 
> > wrote:
> > > >>>>
> > > >>>>> Hi all,
> > > >>>>>
> > > >>>>> I want to describe the discussion process which drove us to have
> > such
> > > >>>>> conclusion, this might make some of
> > > >>>>> the design choices easier to understand and keep everyone on the
> > same
> > > >>>> page.
> > > >>>>>
> > > >>>>> Back to the motivation, what functionality do we want to provide
> in
> > > the
> > > >>>>> first place? We got a lot of feedback and
> > > >>>>> questions from mailing lists that people want to 

Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-20 Thread Kurt Young
Hi all,

I want to describe the discussion process which drove us to have such
conclusion, this might make some of
the design choices easier to understand and keep everyone on the same page.

Back to the motivation, what functionality do we want to provide in the
first place? We got a lot of feedback and
questions from mailing lists that people want to write Not-Insert-Only
messages into kafka. They might be
intentional or by accident, e.g. wrote an non-windowed aggregate query or
non-windowed left outer join. And
some users from KSQL world also asked about why Flink didn't leverage the
Key concept of every kafka topic
and make kafka as a dynamic changing keyed table.

To work with kafka better, we were thinking to extend the functionality of
the current kafka connector by letting it
accept updates and deletions. But due to the limitation of kafka, the
update has to be "update by key", aka a table
with primary key.

This introduces a couple of conflicts with current kafka table's options:
1. key.fields: as said above, we need the kafka table to have the primary
key constraint. And users can also configure
key.fields freely, this might cause friction. (Sure we can do some sanity
check on this but it also creates friction.)
2. sink.partitioner: to make the semantics right, we need to make sure all
the updates on the same key are written to
the same kafka partition, such we should force to use a hash by key
partition inside such table. Again, this has conflicts
and creates friction with current user options.

The above things are solvable, though not perfect or most user friendly.

Let's take a look at the reading side. The keyed kafka table contains two
kinds of messages: upsert or deletion. What upsert
means is "If the key doesn't exist yet, it's an insert record. Otherwise
it's an update record". For the sake of correctness or
simplicity, the Flink SQL engine also needs such information. If we
interpret all messages to "update record", some queries or
operators may not work properly. It's weird to see an update record but you
haven't seen the insert record before.

So what Flink should do is after reading out the records from such table,
it needs to create a state to record which messages have
been seen and then generate the correct row type correspondingly. This kind
of couples the state and the data of the message
queue, and it also creates conflicts with current kafka connector.

Think about if users suspend a running job (which contains some reading
state now), and then change the start offset of the reader.
By changing the reading offset, it actually change the whole story of
"which records should be insert messages and which records
should be update messages). And it will also make Flink to deal with
another weird situation that it might receive a deletion
on a non existing message.

We were unsatisfied with all the frictions and conflicts it will create if
we enable the "upsert & deletion" support to the current kafka
connector. And later we begin to realize that we shouldn't treat it as a
normal message queue, but should treat it as a changing keyed
table. We should be able to always get the whole data of such table (by
disabling the start offset option) and we can also read the
changelog out of such table. It's like a HBase table with binlog support
but doesn't have random access capability (which can be fulfilled
by Flink's state).

So our intention was instead of telling and persuading users what kind of
options they should or should not use by extending
current kafka connector when enable upsert support, we are actually create
a whole new and different connector that has total
different abstractions in SQL layer, and should be treated totally
different with current kafka connector.

Hope this can clarify some of the concerns.

Best,
Kurt


On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang  wrote:

> Hi devs,
>
> As many people are still confused about the difference option behaviours
> between the Kafka connector and KTable connector, Jark and I list the
> differences in the doc[1].
>
> Best,
> Shengkai
>
> [1]
>
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
>
> Shengkai Fang  于2020年10月20日周二 下午12:05写道:
>
> > Hi Konstantin,
> >
> > Thanks for your reply.
> >
> > > It uses the "kafka" connector and does not specify a primary key.
> > The dimensional table `users` is a ktable connector and we can specify
> the
> > pk on the KTable.
> >
> > > Will it possible to use a "ktable" as a dimensional table in FLIP-132
> > Yes. We can specify the watermark on the KTable and it can be used as a
> > dimension table in temporal join.
> >
> > >Introduce a new connector vs introduce a new property
> > The main reason behind is that the KTable connector almost has no common
> > options with the Kafka connector. The options that can be reused by
> KTable
> > connectors are 'topic', 'properties.bootstrap.servers' and
> > 'value.fields-include' . We can't set cdc format for 

Re: [DISCUSS] Release 1.12 Feature Freeze

2020-10-19 Thread Kurt Young
Can we change the freeze date to October 30th (Friday next week)? It would
be helpful
for us if we have 2 more days.

Best,
Kurt


On Mon, Oct 19, 2020 at 5:00 PM Robert Metzger  wrote:

> Hi all,
>
> Dian and I would like to discuss a few items regarding the upcoming Flink
> 1.12 feature freeze:
>
> *A) Exact feature freeze day*
> So far, we've always said "end of October
> " for the
> freeze. We propose (end of day CEST) October 28th (Wednesday next week) as
> the feature freeze time.
> We want to create RC0 on the day after the feature freeze, to make sure the
> RC creation process is running smoothly, and to have a common testing
> reference point.
>
>
>
> *B) What does feature freeze mean?*After the feature freeze, no new
> features are allowed to be merged to master. Only bug fixes and
> documentation improvements.
> The release managers will revert new feature commits after the feature
> freeze.
> Rational: The goal of the feature freeze phase is to improve the system
> stability by addressing known bugs. New features tend to introduce new
> instabilities, which would prolong the release process.
> If you need to merge a new feature after the freeze, please open a
> discussion on the dev@ list. If there are no objections by a PMC member
> within 48 (workday)hours, the feature can be merged.
>
> *C) When to cut the "release-1.12" branch off master?*
> In the last feature freeze, we had a pretty lengthy phase of maintaining
> the "master" and "release-1.11" branches with the same fixes. Therefore, I
> would like to propose an adjustment to the release process: We will have a
> stabilization phase on master, between the feature freeze and the branch
> cut.
> I expect this stabilization phase to last between 1 and 3 weeks, depending
> on the issues we find. Once all blockers are resolved, and no new blockers
> are surfacing, we can cut off the "release-1.12" branch and finalize the
> release.
> Is anybody in the community waiting for the cut off to happen sooner so
> that they can merge a big feature to Flink 1.13 ? (if that would be the
> case, then we can not have a stabilization phase)
>
>
> Let me know what you think!
>
> Best,
> Dian and Robert
>


Re: [ANNOUNCE] New PMC member: Zhu Zhu

2020-10-09 Thread Kurt Young
Congratulations, Zhu Zhu!

Best,
Kurt


On Sat, Oct 10, 2020 at 11:03 AM Yang Wang  wrote:

> Congratulations! Zhu Zhu.
>
> Best,
> Yang
>
> Xintong Song  于2020年10月9日周五 下午3:35写道:
>
> > Congratulations, Zhu~!
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Oct 9, 2020 at 3:17 PM Jingsong Li 
> wrote:
> >
> > > Congratulations, Zhu Zhu!
> > >
> > > On Fri, Oct 9, 2020 at 3:08 PM Zhijiang  > > .invalid>
> > > wrote:
> > >
> > > > Congratulations and welcome, Zhu Zhu!
> > > >
> > > > Best,
> > > > Zhijiang
> > > > --
> > > > From:Yun Tang 
> > > > Send Time:2020年10月9日(星期五) 14:20
> > > > To:dev@flink.apache.org 
> > > > Subject:Re: [ANNOUNCE] New PMC member: Zhu Zhu
> > > >
> > > > Congratulations, Zhu!
> > > >
> > > > Best
> > > > Yun Tang
> > > > 
> > > > From: Danny Chan 
> > > > Sent: Friday, October 9, 2020 13:51
> > > > To: dev@flink.apache.org 
> > > > Subject: Re: [ANNOUNCE] New PMC member: Zhu Zhu
> > > >
> > > > Congrats, Zhu Zhu ~
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2020年10月9日 +0800 PM1:05,dev@flink.apache.org,写道:
> > > > >
> > > > > Congrats, Zhu Zhu
> > > >
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


Re: [VOTE] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-24 Thread Kurt Young
+1 (binding)

Best,
Kurt


On Thu, Sep 24, 2020 at 4:01 PM Timo Walther  wrote:

> Hi all,
>
> after the discussion in [1], I would like to open a second voting thread
> for FLIP-136 [2] which covers different topic to improve the
> back-and-forth communication between DataStream API and Table API.
>
> The vote will be open until September 29th (72h + weekend), unless there
> is an objection or not enough votes.
>
> Regards,
> Timo
>
> [1]
>
> https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
>


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

2020-09-24 Thread Kurt Young
Yeah, JDBC is definitely a popular use case we should consider.

Best,
Kurt


On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier 
wrote:

> Hi Kurt, in the past we had a very interesting use case in this regard: our
> customer (oracle) db was quite big and running too many queries in parallel
> was too heavy and it was causing the queries to fail.
> So we had to limit the source parallelism to 2 threads. After the fetching
> of the data the other operators could use the max parallelism as usual..
>
> Best,
> Flavio
>
> On Thu, Sep 24, 2020 at 9:59 AM Kurt Young  wrote:
>
> > Thanks Jingsong for driving this, this is indeed a useful feature and
> lots
> > of users are asking for it.
> >
> > For setting a fixed source parallelism, I'm wondering whether this is
> > necessary. For kafka,
> > I can imagine users would expect Flink will use the number of partitions
> as
> > the parallelism. If it's too
> > large, one can use the max parallelism to make it smaller.
> > But for ES, which doesn't have ability to decide a reasonable parallelism
> > on its own, it might make sense
> > to introduce a user specified parallelism for such table source.
> >
> > So I think it would be better to reorganize the document a little bit, to
> > explain the connectors one by one. Briefly
> > introduce use cases and what kind of options are needed in your opinion.
> >
> > Regarding the interface `DataStreamScanProvider`, a concrete example
> would
> > help the discussion. What kind
> > of scenarios do you want to support? And what kind of connectors need
> such
> > an interface?
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 9:30 PM admin <17626017...@163.com> wrote:
> >
> > > +1,it’s a good news
> > >
> > > > 2020年9月23日 下午6:22,Jingsong Li  写道:
> > > >
> > > > Hi all,
> > > >
> > > > I'd like to start a discussion about improving the new TableSource
> and
> > > > TableSink interfaces.
> > > >
> > > > Most connectors have been migrated to FLIP-95, but there are still
> the
> > > > Filesystem and Hive that have not been migrated. They have some
> > > > requirements on table connector API. And users also have some
> > additional
> > > > requirements:
> > > > - Some connectors have the ability to infer parallelism, the
> > parallelism
> > > is
> > > > good for most cases.
> > > > - Users have customized parallelism configuration requirements for
> > source
> > > > and sink.
> > > > - The connectors need to use topology to build their source/sink
> > instead
> > > of
> > > > a single function. Like JIRA[1], Partition Commit feature and File
> > > > Compaction feature.
> > > >
> > > > Details are in [2].
> > > >
> > > > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > > >
> > > > Best,
> > > > Jingsong
> > >
> > >
>


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

2020-09-24 Thread Kurt Young
Thanks Jingsong for driving this, this is indeed a useful feature and lots
of users are asking for it.

For setting a fixed source parallelism, I'm wondering whether this is
necessary. For kafka,
I can imagine users would expect Flink will use the number of partitions as
the parallelism. If it's too
large, one can use the max parallelism to make it smaller.
But for ES, which doesn't have ability to decide a reasonable parallelism
on its own, it might make sense
to introduce a user specified parallelism for such table source.

So I think it would be better to reorganize the document a little bit, to
explain the connectors one by one. Briefly
introduce use cases and what kind of options are needed in your opinion.

Regarding the interface `DataStreamScanProvider`, a concrete example would
help the discussion. What kind
of scenarios do you want to support? And what kind of connectors need such
an interface?

Best,
Kurt


On Wed, Sep 23, 2020 at 9:30 PM admin <17626017...@163.com> wrote:

> +1,it’s a good news
>
> > 2020年9月23日 下午6:22,Jingsong Li  写道:
> >
> > Hi all,
> >
> > I'd like to start a discussion about improving the new TableSource and
> > TableSink interfaces.
> >
> > Most connectors have been migrated to FLIP-95, but there are still the
> > Filesystem and Hive that have not been migrated. They have some
> > requirements on table connector API. And users also have some additional
> > requirements:
> > - Some connectors have the ability to infer parallelism, the parallelism
> is
> > good for most cases.
> > - Users have customized parallelism configuration requirements for source
> > and sink.
> > - The connectors need to use topology to build their source/sink instead
> of
> > a single function. Like JIRA[1], Partition Commit feature and File
> > Compaction feature.
> >
> > Details are in [2].
> >
> > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> >
> > Best,
> > Jingsong
>
>


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-23 Thread Kurt Young
I see, I missed the part that a row is either in positioned mode or nameed
mode.
I can live with this. Thanks.

Best,
Kurt


On Wed, Sep 23, 2020 at 9:07 PM Timo Walther  wrote:

> But the examples you mentioned would not be different.
>
> By calling `Row.withNames()`, the row has no definition of position. All
> position-based methods would throw an exception.
>
> The hashCode()/equals() would return true for:
>
>  > Row row1 = Row.withNames();
>  > row.setField("a", 1);
>  > row.setField("b", 2);
>  >
>  > Row row2 = Row.withNames();
>  > row.setField("b", 2);
>  > row.setField("a", 1);
>
> row2.equals(row1)
>
> The row is just a container for the serializer/converter which will
> ensure ordering.
>
> Regards,
> Timo
>
> On 23.09.20 15:00, Kurt Young wrote:
> > Thanks for the detailed response, 1-5 sounds good to me.
> >
> > For #6, I just think of another case which would also annoy users.
> Consider
> > code like this:
> >
> > Row row = Row.withNames();
> > row.setField("a", 1);
> > row.setField("b", 2);
> >
> > and for second time, he changes the sequence of setting method calls:
> >
> > Row row = Row.withNames();
> > row.setField("b", 2);
> > row.setField("a", 1);
> >
> > I don't think anyone would expect these two rows are actually different.
> >
> > Instead, if we at least define the field names first, which will fix the
> > order, we would not have such side effects.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 8:47 PM Timo Walther  wrote:
> >
> >> Hi Kurt,
> >>
> >> thanks for your feedback.
> >>
> >> 1. "moving Schema after DataStream": I don't have a strong opinion here.
> >> One could argue that the API would look similar to a CREATE TABLE
> >> statement: first schema then connector. I updated the FLIP.
> >>
> >> 2. "will we do some verification?"
> >> Yes, we will definitely do verification. It will happen based on what is
> >> available in TypeInformation.
> >>
> >> "if T is a Tuple, do we have some rules for setting field names in
> Schema?"
> >> The rule in this case would be to take the
> >> TupleTypeInfoBase.getFieldNames() similar to the logic we currently
> have.
> >>
> >> "Will we do some type coercion?"
> >> For `fromDataStream()`, type coercion between an explicitly specified
> >> Schema and DataStream will not happen (e.g. DataStream !=
> >> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
> >> desired data type explicitly and expects correctness.
> >> For `toDataStream()`, it has similar type coercion semantics as a
> >> regular table sink (first on a logical level, then on a class level).
> >>
> >> It is difficult to list all type rules upfront, but it should behave
> >> similar to all the work done in FLIP-65 and FLIP-95. I would move the
> >> discussion about other type handling to the individual PRs. The general
> >> goal should be to stay backwards compatible but reduce manual schema
> work.
> >>
> >> 3. "How do you derive schema from DataStream"
> >>
> >> We use RowTypeInfo (if DataStream comes from DataStream API) or
> >> ExternalTypeInfo (if DataStream comes from Table API).
> >>
> >> 4. "toDataStream(AbstractDataType, Table) I'm wondering whether this
> >> method is necessary"
> >> Dealing with Row in DataStream API is very inconvenient. With the new
> >> data format converters, the behavior would be consistent accross
> >> DataStream API and Table functions. The logic is already present and
> >> seems to be pretty stable so far. We would break a lot of existing code
> >> if we get rid of this method.
> >>
> >> 5. "How does Row behave like GenericRowData?"
> >>
> >> Row can contain StringData or further nested RowData. The data format
> >> converters support that. The conversion of fields would be a no-op in
> >> this case. In the end, both Row and GenericRowData just stored an
> Object[].
> >>
> >> 6. "They would expect that all the fields they didn't set should be
> NULL."
> >>
> >> But this will be the case. The full list of all field names and their
> >> order is defined by the data type, not the R

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-23 Thread Kurt Young
Thanks for the detailed response, 1-5 sounds good to me.

For #6, I just think of another case which would also annoy users. Consider
code like this:

Row row = Row.withNames();
row.setField("a", 1);
row.setField("b", 2);

and for second time, he changes the sequence of setting method calls:

Row row = Row.withNames();
row.setField("b", 2);
row.setField("a", 1);

I don't think anyone would expect these two rows are actually different.

Instead, if we at least define the field names first, which will fix the
order, we would not have such side effects.

Best,
Kurt


On Wed, Sep 23, 2020 at 8:47 PM Timo Walther  wrote:

> Hi Kurt,
>
> thanks for your feedback.
>
> 1. "moving Schema after DataStream": I don't have a strong opinion here.
> One could argue that the API would look similar to a CREATE TABLE
> statement: first schema then connector. I updated the FLIP.
>
> 2. "will we do some verification?"
> Yes, we will definitely do verification. It will happen based on what is
> available in TypeInformation.
>
> "if T is a Tuple, do we have some rules for setting field names in Schema?"
> The rule in this case would be to take the
> TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.
>
> "Will we do some type coercion?"
> For `fromDataStream()`, type coercion between an explicitly specified
> Schema and DataStream will not happen (e.g. DataStream !=
> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
> desired data type explicitly and expects correctness.
> For `toDataStream()`, it has similar type coercion semantics as a
> regular table sink (first on a logical level, then on a class level).
>
> It is difficult to list all type rules upfront, but it should behave
> similar to all the work done in FLIP-65 and FLIP-95. I would move the
> discussion about other type handling to the individual PRs. The general
> goal should be to stay backwards compatible but reduce manual schema work.
>
> 3. "How do you derive schema from DataStream"
>
> We use RowTypeInfo (if DataStream comes from DataStream API) or
> ExternalTypeInfo (if DataStream comes from Table API).
>
> 4. "toDataStream(AbstractDataType, Table) I'm wondering whether this
> method is necessary"
> Dealing with Row in DataStream API is very inconvenient. With the new
> data format converters, the behavior would be consistent accross
> DataStream API and Table functions. The logic is already present and
> seems to be pretty stable so far. We would break a lot of existing code
> if we get rid of this method.
>
> 5. "How does Row behave like GenericRowData?"
>
> Row can contain StringData or further nested RowData. The data format
> converters support that. The conversion of fields would be a no-op in
> this case. In the end, both Row and GenericRowData just stored an Object[].
>
> 6. "They would expect that all the fields they didn't set should be NULL."
>
> But this will be the case. The full list of all field names and their
> order is defined by the data type, not the Row instance. During
> serialization/conversion we can reorder fields, throw exceptions about
> unknown field names, and set remaining fields to NULL.
>
> If a user uses `new Row(5)` but the serializer is configured by a data
> type that only supports `Row(3)`, it will also throw an exception during
> runtime. We cannot guard users from creating invalid rows. But the
> strongly typed serializers/converters will do the final verification.
>
> Regards,
> Timo
>
>
> On 23.09.20 12:08, Kurt Young wrote:
> > Sorry for being late, I went through the design doc and here are
> > my comments:
> >
> > 1. A minor one, how about moving Schema after DataStream in all affected
> > APIs? Such as:
> > StreamTableEnvironment.fromDataStream(Schema, DataStream): Table
> > StreamTableEnvironment.createTemporaryView(String, Schema,
> DataStream):
> > Unit
> > StreamTableEnvironment.fromChangelogStream(Schema, DataStream):
> Table
> > StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream
> >
> > It will look more aligned with APIs which don't have Schema. For example:
> > StreamTableEnvironment.fromDataStream(DataStream): Table
> > StreamTableEnvironment.fromDataStream(DataStream, Schema): Table
> >
> > 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> > DataStream): Table
> > How do we convert the types between Schema and T, will we do some
> > verification? Will we do some type coercion? For example,
> > can we support Schema.LONG with DataStream? And if T is a Tuple,
> > do we have some ru

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-23 Thread Kurt Young
Sorry for being late, I went through the design doc and here are
my comments:

1. A minor one, how about moving Schema after DataStream in all affected
APIs? Such as:
StreamTableEnvironment.fromDataStream(Schema, DataStream): Table
StreamTableEnvironment.createTemporaryView(String, Schema, DataStream):
Unit
StreamTableEnvironment.fromChangelogStream(Schema, DataStream): Table
StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream

It will look more aligned with APIs which don't have Schema. For example:
StreamTableEnvironment.fromDataStream(DataStream): Table
StreamTableEnvironment.fromDataStream(DataStream, Schema): Table

2. A question to: StreamTableEnvironment.fromDataStream(Schema,
DataStream): Table
How do we convert the types between Schema and T, will we do some
verification? Will we do some type coercion? For example,
can we support Schema.LONG with DataStream? And if T is a Tuple,
do we have some rules for setting field names in Schema?
I can see lots of imagination from this method but the rules are unclear to
me.

3. A question to: StreamTableEnvironment.fromChangelogStream(DataStream):
Table
How do you derive schema from DataStream?

4. A question to: StreamTableEnvironment.toDataStream(AbstractDataType,
Table): DataStream
I'm wondering whether this method is necessary. Always getting a
DataStream from the table and then manually applying some
map function seems to be not cumbersome and safer (such intelligent
conversion always seems error prone to me).

5.
> The `toChangelogStream(Schema, Table)` exists for completeness to have a
symmetric API.
> It allows for declaring the data type for output similar to
DynamicTableSinks.
> Additionally, internal structures such as StringData, TimestampData can
still be used by power users.
> In that sense, Row can behave like a GenericRowData.

How does Row behave like GenericRowData? I don't think Row can work with
RowData for now.

6. Row.withNames() seems dangerous to me. It relies on user setting all the
fields they need during `setField(String name, T value)`.
It's also highly possible that users would not set certain fields when for
example some fields are NULL. They would expect that all the fields
they didn't set should be NULL.
Row.withNames(String[] filedNames) or Row.withNames(List
fieldNames) seems to be a safer choice.
I agree that simplicity is important but making API safer to use is also
important.

Best,
Kurt


On Wed, Sep 23, 2020 at 4:15 PM Timo Walther  wrote:

> Hi Jark,
>
> thanks for your feedback. I removed `withNamesAndPositions` from the
> public API list and added a comment that this is only internal API for
> converters and serializers.
>
> I would start a new vote tomorrow if there are no objections.
>
> What do you think?
>
> Regards,
> Timo
>
> On 23.09.20 08:55, Jark Wu wrote:
> > Hi Timo,
> >
> > Sorry for the late reply.
> > I think it would be great if we can make `withNamesAndPositions` internal
> > visible. This reduces the complexity of the public API.
> > It's hard to come up with a perfect solution. So let's move on this FLIP.
> > I don't have other concerns.
> >
> > Best,
> > Jark
> >
> > On Fri, 18 Sep 2020 at 22:14, Timo Walther  wrote:
> >
> >> Hi Jark,
> >>
> >> the fieldNames map is not intended for users. I would also be fine to
> >> make it a default scope constructor and access it with some internal
> >> utility class next to the Row class. The fieldNames map must only be
> >> used by serializers and converters. A user has no benefit in using it.
> >>
> >> For the creation of new rows (without reusing, which only advanced users
> >> usually do), I don't see a benefit of having:
> >>
> >> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >> reuse.setField("myField", 12);
> >> reuse.setField("myOtherField", "This is a test");
> >>
> >> The purpose of Row.withName() is too create a Row easily and readable
> >> without declaring 50+ column names or dealing with indices in this
> range.
> >>
> >> Personally, I would like to make Row an interface and have concrete row
> >> implementations for different purposes but this would break existing
> >> programs too much.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 18.09.20 11:04, Jark Wu wrote:
> >>> Personally I think the fieldNames Map is confusing and not handy.
> >>> I just have an idea but not sure what you think.
> >>> What about adding a new constructor with List field names, this enables
> >> all
> >>> name-based setter/getters.
> >>> Regarding to List -> Map cost for every record, we can suggest users to
> >>> reuse the Row in the task.
> >>>
> >>> new Row(int arity)
> >>> new Row(List fieldNames)
> >>>
> >>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>> reuse.setField("myField", 12);
> >>> reuse.setField("myOtherField", "This is a test");
> >>>
> >>> My point is that, if we can have a handy constructor for named Row, we
> >> may
> >>> not need to distinguish the 

Re: Re: [ANNOUNCE] New Apache Flink Committer - Godfrey He

2020-09-16 Thread Kurt Young
Congrats and welcome, Godfrey!

Best,
Kurt


On Wed, Sep 16, 2020 at 4:59 PM Xintong Song  wrote:

> Congratulations, Godfrey~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Sep 16, 2020 at 3:28 PM Yu Li  wrote:
>
> > Congrats and welcome, Godfrey!
> >
> > Best Regards,
> > Yu
> >
> >
> > On Wed, 16 Sep 2020 at 15:18, 刘大龙  wrote:
> >
> > >
> > > Congratulations!
> > >
> > > > -原始邮件-
> > > > 发件人: "Benchao Li" 
> > > > 发送时间: 2020-09-16 14:22:25 (星期三)
> > > > 收件人: dev 
> > > > 抄送: "贺小令" 
> > > > 主题: Re: [ANNOUNCE] New Apache Flink Committer - Godfrey He
> > > >
> > > > Congratulations!
> > > >
> > > > Zhu Zhu  于2020年9月16日周三 下午1:36写道:
> > > >
> > > > > Congratulations!
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Leonard Xu  于2020年9月16日周三 下午1:32写道:
> > > > >
> > > > > > Congratulations! Godfrey
> > > > > >
> > > > > > Best,
> > > > > > Leonard
> > > > > >
> > > > > > > 在 2020年9月16日,13:12,Yangze Guo  写道:
> > > > > > >
> > > > > > > Congratulations! Xiaoling.
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Wed, Sep 16, 2020 at 12:45 PM Dian Fu <
> dian0511...@gmail.com>
> > > > > wrote:
> > > > > > >>
> > > > > > >> Congratulations, well deserved!
> > > > > > >>
> > > > > > >> Regards,
> > > > > > >> Dian
> > > > > > >>
> > > > > > >>> 在 2020年9月16日,下午12:36,Guowei Ma  写道:
> > > > > > >>>
> > > > > > >>> Congratulations :)
> > > > > > >>>
> > > > > > >>> Best,
> > > > > > >>> Guowei
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Wed, Sep 16, 2020 at 12:19 PM Jark Wu 
> > > wrote:
> > > > > > >>>
> > > > > >  Hi everyone,
> > > > > > 
> > > > > >  It's great seeing many new Flink committers recently, and on
> > > behalf
> > > > > > of the
> > > > > >  PMC,
> > > > > >  I'd like to announce one more new committer: Godfrey He.
> > > > > > 
> > > > > >  Godfrey is a very long time contributor in the Flink
> community
> > > since
> > > > > > the
> > > > > >  end of 2016.
> > > > > >  He has been a very active contributor in the Flink SQL
> > component
> > > > > with
> > > > > > 153
> > > > > >  PRs and more than 571,414 lines which is quite outstanding.
> > > > > >  Godfrey has paid essential effort with SQL optimization and
> > > helped a
> > > > > > lot
> > > > > >  during the blink merging.
> > > > > >  Besides that, he is also quite active with community work
> > > especially
> > > > > > in
> > > > > >  Chinese mailing list.
> > > > > > 
> > > > > >  Please join me in congratulating Godfrey for becoming a
> Flink
> > > > > > committer!
> > > > > > 
> > > > > >  Cheers,
> > > > > >  Jark Wu
> > > > > > 
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Kurt Young
>timestamp INT METADATA"
>> > > > >
>> > > > > I really like the proposal, there is no confusion with computed
>> > > > > column any
>> > > > > more,  and it’s concise enough.
>> > > > >
>> > > > >
>> > > > > @Timo @Dawid
>> > > > > “We use `SYSTEM_TIME` for temporal tables. I think prefixing with
>> SYSTEM
>> > > > > makes it clearer that it comes magically from the system.”
>> > > > > “As for the issue of shortening the SYSTEM_METADATA to METADATA.
>> Here I
>> > > > > very much prefer the SYSTEM_ prefix.”
>> > > > >
>> > > > > I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
>> > > > > First of all,  the word `TIME` has broad meanings but the word
>> > > > > `METADATA `
>> > > > > not,  `METADATA ` has specific meaning,
>> > > > > Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
>> > > > > `SYSTEM_METADATA ` not.
>> > > > > Personally, I like more simplify way,sometimes  less is more.
>> > > > >
>> > > > >
>> > > > > Best,
>> > > > > Leonard
>> > > > >
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > Timo Walther  于2020年9月9日周三 下午6:41写道:
>> > > > > >
>> > > > > > > Hi everyone,
>> > > > > > >
>> > > > > > > "key" and "value" in the properties are a special case
>> because they
>> > > > > > > need
>> > > > > > > to configure a format. So key and value are more than just
>> metadata.
>> > > > > > > Jark's example for setting a timestamp would work but as the
>> FLIP
>> > > > > > > discusses, we have way more metadata fields like headers,
>> > > > > > > epoch-leader,
>> > > > > > > etc. Having a property for all of this metadata would mess up
>> the WITH
>> > > > > > > section entirely. Furthermore, we also want to deal with
>> metadata from
>> > > > > > > the formats. Solving this through properties as well would
>> further
>> > > > > > > complicate the property design.
>> > > > > > >
>> > > > > > > Personally, I still like the computed column design more
>> because it
>> > > > > > > allows to have full flexibility to compute the final column:
>> > > > > > >
>> > > > > > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
>> > > > > TIMESTAMP(3)))
>> > > > > > >
>> > > > > > > Instead of having a helper column and a real column in the
>> table:
>> > > > > > >
>> > > > > > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
>> > > > > > > realTimestamp AS adjustTimestamp(helperTimestamp)
>> > > > > > >
>> > > > > > > But I see that the discussion leans towards:
>> > > > > > >
>> > > > > > > timestamp INT SYSTEM_METADATA("ts")
>> > > > > > >
>> > > > > > > Which is fine with me. It is the shortest solution, because
>> we don't
>> > > > > > > need additional CAST. We can discuss the syntax, so that
>> confusion
>> > > > > > > with
>> > > > > > > computed columns can be avoided.
>> > > > > > >
>> > > > > > > timestamp INT USING SYSTEM_METADATA("ts")
>> > > > > > > timestamp INT FROM SYSTEM_METADATA("ts")
>> > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>> > > > > > >
>> > > > > > > We use `SYSTEM_TIME` for temporal tables. I think prefixing
>> with
>> > > > > > > SYSTEM
>> > > > > > > makes it clearer that it comes magically from the system.
>> > > > > > >
>> > > > > > > What do you think?
>> > > > > > >
>> >

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Kurt Young
Yes, I didn't intend to block this FLIP, and some of the comments are
actually implementation details.
And all of them are handled internally, not visible to users, thus we can
also change or improve them
in the future.

Best,
Kurt


On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek  wrote:

> I think Kurts concerns/comments are very valid and we need to implement
> such things in the future. However, I also think that we need to get
> started somewhere and I think what's proposed in this FLIP is a good
> starting point that we can build on. So we should not get paralyzed by
> thinking too far ahead into the future. Does that make sense?
>
> Best,
> Aljoscha
>
> On 08.09.20 16:59, Dawid Wysakowicz wrote:
> > Ad. 1
> >
> > Yes, you are right in principle.
> >
> > Let me though clarify my proposal a bit. The proposed sort-style
> > execution aims at a generic KeyedProcessFunction were all the
> > "aggregations" are actually performed in the user code. It tries to
> > improve the performance by actually removing the need to use RocksDB
> e.g.:
> >
> >  private static final class Summer
> >  extends KeyedProcessFunction,
> > Tuple2> {
> >
> >  
> >
> >  @Override
> >  public void processElement(
> >  Tuple2 value,
> >  Context ctx,
> >  Collector> out) throws Exception {
> >  if (!Objects.equals(timerRegistered.value(), Boolean.TRUE))
> {
> >
> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> >  timerRegistered.update(true);
> >  }
> >  Integer v = counter.value();
> >  Integer incomingValue = value.f1;
> >  if (v != null) {
> >  v += incomingValue;
> >  } else {
> >  v = incomingValue;
> >  }
> >  counter.update(v);
> >  }
> >
> >  
> >
> > }
> >
> > Therefore I don't think the first part of your reply with separating the
> > write and read workload applies here. We do not aim to create a
> > competing API with the Table API. We think operations such as joins or
> > analytical aggregations should be performed in Table API.
> >
> > As for the second part I agree it would be nice to fall back to the
> > sorting approach only if a certain threshold of memory in a State
> > Backend is used. This has some problems though. We would need a way to
> > estimate the size of the occupied memory to tell when the threshold is
> > reached. That is not easily doable by default e.g. in a
> > MemoryStateBackend, as we do not serialize the values in the state
> > backend by default. We would have to add that, but this would add the
> > overhead of the serialization.
> >
> > This proposal aims at the cases where we do have a large state that will
> > not fit into the memory and without the change users are forced to use
> > RocksDB. If the state fits in memory I agree it will be better to do
> > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> > think it is important to give users the choice to use one or the other
> > approach. We might discuss which approach should be the default for
> > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> > user configured state backend or sorting-based with a single key at a
> > time backend. Moreover we could think if we should let users choose the
> > sort vs hash "state backend" per operator. Would that suffice?
> >
> > Ad. 2
> >
> > I still think we can just use the first X bytes of the serialized form
> > as the normalized key and fallback to comparing full keys on clashes. It
> > is because we are actually not interested in a logical order, but we
> > care only about the "grouping" aspect of the sorting. Therefore I think
> > its enough to compare only parts of the full key as the normalized key.
> >
> > Thanks again for the really nice and thorough feedback!
> >
> > Best,
> >
> > Dawid
> >
> > On 08/09/2020 14:47, Kurt Young wrote:
> >> Regarding #1, yes the state backend is definitely hash-based execution.
> >> However there are some differences between
> >> batch hash-based execution. The key difference is *random access &
> >> read/write mixed workload". For example, by using
> >> state backend in streaming execution, one have to mix the read and write
> >> operations and all of 

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Kurt Young
 I would vote for `offset INT SYSTEM_METADATA("offset")`.

I don't think we can stick with the SQL standard in DDL part forever,
especially as there are more and more
requirements coming from different connectors and external systems.

Best,
Kurt


On Wed, Sep 9, 2020 at 4:40 PM Timo Walther  wrote:

> Hi Jark,
>
> now we are back at the original design proposed by Dawid :D Yes, we
> should be cautious about adding new syntax. But the length of this
> discussion shows that we are looking for a good long-term solution. In
> this case I would rather vote for a deep integration into the syntax.
>
> Computed columns are also not SQL standard compliant. And our DDL is
> neither, so we have some degree of freedom here.
>
> Trying to solve everything via properties sounds rather like a hack to
> me. You are right that one could argue that "timestamp", "headers" are
> something like "key" and "value". However, mixing
>
> `offset AS SYSTEM_METADATA("offset")`
>
> and
>
> `'timestamp.field' = 'ts'`
>
> looks more confusing to users that an explicit
>
> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
>
> or
>
> `offset INT SYSTEM_METADATA("offset")`
>
> that is symetric for both source and sink.
>
> What do others think?
>
> Regards,
> Timo
>
>
> On 09.09.20 10:09, Jark Wu wrote:
> > Hi everyone,
> >
> > I think we have a conclusion that the writable metadata shouldn't be
> > defined as a computed column, but a normal column.
> >
> > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the approaches.
> > However, it is not SQL standard compliant, we need to be cautious enough
> > when adding new syntax.
> > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> > resolve the query-sink schema problem if it is read-only metadata. That
> > adds more stuff to learn for users.
> >
> >>From my point of view, the "timestamp", "headers" are something like
> "key"
> > and "value" that stores with the real data. So why not define the
> > "timestamp" in the same way with "key" by using a "timestamp.field"
> > connector option?
> > On the other side, the read-only metadata, such as "offset", shouldn't be
> > defined as a normal column. So why not use the existing computed column
> > syntax for such metadata? Then we don't have the query-sink schema
> problem.
> > So here is my proposal:
> >
> > CREATE TABLE kafka_table (
> >id BIGINT,
> >name STRING,
> >col1 STRING,
> >col2 STRING,
> >ts TIMESTAMP(3) WITH LOCAL TIME ZONE,-- ts is a normal field, so
> can
> > be read and written.
> >offset AS SYSTEM_METADATA("offset")
> > ) WITH (
> >'connector' = 'kafka',
> >'topic' = 'test-topic',
> >'key.fields' = 'id, name',
> >'key.format' = 'csv',
> >'value.format' = 'avro',
> >'timestamp.field' = 'ts'-- define the mapping of Kafka timestamp
> > );
> >
> > INSERT INTO kafka_table
> > SELECT id, name, col1, col2, rowtime FROM another_table;
> >
> > I think this can solve all the problems without introducing any new
> syntax.
> > The only minor disadvantage is that we separate the definition way/syntax
> > of read-only metadata and read-write fields.
> > However, I don't think this is a big problem.
> >
> > Best,
> > Jark
> >
> >
> > On Wed, 9 Sep 2020 at 15:09, Timo Walther  wrote:
> >
> >> Hi Kurt,
> >>
> >> thanks for sharing your opinion. I'm totally up for not reusing computed
> >> columns. I think Jark was a big supporter of this syntax, @Jark are you
> >> fine with this as well? The non-computed column approach was only a
> >> "slightly rejected alternative".
> >>
> >> Furthermore, we would need to think about how such a new design
> >> influences the LIKE clause though.
> >>
> >> However, we should still keep the `PERSISTED` keyword as it influences
> >> the query->sink schema. If you look at the list of metadata for existing
> >> connectors and formats, we currently offer only two writable metadata
> >> fields. Otherwise, one would need to declare two tables whenever a
> >> metadata columns is read (one for the source, one for the sink). This
> >> can be quite inconvientient e.g. for just reading the topic.
> >>
> >> Regards,
> >> Timo
> >>
> >>

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Kurt Young
>>>
> >>>> In any case, the user must provide a CAST such that the computed
> column
> >>>> receives a valid data type when constructing the table schema.
> >>>>
> >>>> "I don't see a reason why `DecodingFormat#applyReadableMetadata`
> needs a
> >>>> DataType argument."
> >>>>
> >>>> Correct he DeserializationSchema doesn't need TypeInfo, it is always
> >>>> executed locally. It is the source that needs TypeInfo for serializing
> >>>> the record to the next operator. And that's this is what we provide.
> >>>>
> >>>> @Danny:
> >>>>
> >>>> “SYSTEM_METADATA("offset")` returns the NULL type by default”
> >>>>
> >>>> We can also use some other means to represent an UNKNOWN data type. In
> >>>> the Flink type system, we use the NullType for it. The important part
> is
> >>>> that the final data type is known for the entire computed column. As I
> >>>> mentioned before, I would avoid the suggested option b) that would be
> >>>> similar to your suggestion. The CAST should be enough and allows for
> >>>> complex expressions in the computed column. Option b) would need
> parser
> >>>> changes.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>>
> >>>> On 08.09.20 06:21, Leonard Xu wrote:
> >>>>> Hi, Timo
> >>>>>
> >>>>> Thanks for you explanation and update,  I have only one question  for
> >>>> the latest FLIP.
> >>>>>
> >>>>> About the MAP DataType of key
> 'debezium-json.source', if
> >>>> user want to use the table name metadata, they need to write:
> >>>>> tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS
> >>>> MAP)['table']
> >>>>>
> >>>>> the expression is a little complex for user, Could we only support
> >>>> necessary metas with simple DataType as following?
> >>>>> tableName STRING AS
> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> >>>> STRING),
> >>>>> transactionTime LONG AS
> >>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),
> >>>>>
> >>>>> In this way, we can simplify the expression, the mainly used
> metadata in
> >>>> changelog format may include
> 'database','table','source.ts_ms','ts_ms' from
> >>>> my side,
> >>>>> maybe we could only support them at first version.
> >>>>>
> >>>>> Both Debezium and Canal have above four metadata, and I‘m willing to
> >>>> take some subtasks in next development if necessary.
> >>>>>
> >>>>> Debezium:
> >>>>> {
> >>>>> "before": null,
> >>>>> "after": {  "id": 101,"name": "scooter"},
> >>>>> "source": {
> >>>>>   "db": "inventory",  # 1. database name the
> >>>> changelog belongs to.
> >>>>>   "table": "products",# 2. table name the
> changelog
> >>>> belongs to.
> >>>>>   "ts_ms": 1589355504100, # 3. timestamp of the
> change
> >>>> happened in database system, i.e.: transaction time in database.
> >>>>>   "connector": "mysql",
> >>>>>   ….
> >>>>> },
> >>>>> "ts_ms": 1589355606100,  # 4. timestamp when the
> debezium
> >>>> processed the changelog.
> >>>>> "op": "c",
> >>>>> "transaction": null
> >>>>> }
> >>>>>
> >>>>> Canal:
> >>>>> {
> >>>>> "data": [{  "id": "102", "name": "car battery" }],
> >>>>> "database": "inventory",  # 1. database name the changelog
> >>>> belongs to.
> >>>>> "table": "products",  # 2. table name the changelog
> belongs
> >>>> to.
> >>>>&g

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Kurt Young
 I doubt that any sorting algorithm would work with only knowing the keys
are different but without
information of which is greater.

Best,
Kurt


On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz 
wrote:

> Ad. 1
>
> Yes, you are right in principle.
>
> Let me though clarify my proposal a bit. The proposed sort-style
> execution aims at a generic KeyedProcessFunction were all the
> "aggregations" are actually performed in the user code. It tries to
> improve the performance by actually removing the need to use RocksDB e.g.:
>
> private static final class Summer
> extends KeyedProcessFunction,
> Tuple2> {
>
> 
>
> @Override
> public void processElement(
> Tuple2 value,
> Context ctx,
> Collector> out) throws Exception {
> if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> timerRegistered.update(true);
> }
> Integer v = counter.value();
> Integer incomingValue = value.f1;
> if (v != null) {
> v += incomingValue;
> } else {
> v = incomingValue;
> }
> counter.update(v);
> }
>
> 
>
>}
>
> Therefore I don't think the first part of your reply with separating the
> write and read workload applies here. We do not aim to create a
> competing API with the Table API. We think operations such as joins or
> analytical aggregations should be performed in Table API.
>
> As for the second part I agree it would be nice to fall back to the
> sorting approach only if a certain threshold of memory in a State
> Backend is used. This has some problems though. We would need a way to
> estimate the size of the occupied memory to tell when the threshold is
> reached. That is not easily doable by default e.g. in a
> MemoryStateBackend, as we do not serialize the values in the state
> backend by default. We would have to add that, but this would add the
> overhead of the serialization.
>
> This proposal aims at the cases where we do have a large state that will
> not fit into the memory and without the change users are forced to use
> RocksDB. If the state fits in memory I agree it will be better to do
> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> think it is important to give users the choice to use one or the other
> approach. We might discuss which approach should be the default for
> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> user configured state backend or sorting-based with a single key at a
> time backend. Moreover we could think if we should let users choose the
> sort vs hash "state backend" per operator. Would that suffice?
>
> Ad. 2
>
> I still think we can just use the first X bytes of the serialized form
> as the normalized key and fallback to comparing full keys on clashes. It
> is because we are actually not interested in a logical order, but we
> care only about the "grouping" aspect of the sorting. Therefore I think
> its enough to compare only parts of the full key as the normalized key.
>
> Thanks again for the really nice and thorough feedback!
>
> Best,
>
> Dawid
>
> On 08/09/2020 14:47, Kurt Young wrote:
> > Regarding #1, yes the state backend is definitely hash-based execution.
> > However there are some differences between
> > batch hash-based execution. The key difference is *random access &
> > read/write mixed workload". For example, by using
> > state backend in streaming execution, one have to mix the read and write
> > operations and all of them are actually random
> > access. But in a batch hash execution, we could divide the phases into
> > write and read. For example, we can build the
> > hash table first, with only write operations. And once the build is done,
> > we can start to read and trigger the user codes.
> > Take hash aggregation which blink planner implemented as an example,
> during
> > building phase, as long as the hash map
> > could fit into memory, we will update the accumulators directly in the
> hash
> > map. And once we are running out of memory,
> > we then fall back to sort based execution. It improves the performance a
> > lot if the incoming data can be processed in
> > memory.
> >
> > Regarding #2, IIUC you are actually describing a binary format of key,
> not
> > normalized key which is used in DataSet. I will
> > take String for example. If we have lots of keys with

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Kurt Young
Regarding #1, yes the state backend is definitely hash-based execution.
However there are some differences between
batch hash-based execution. The key difference is *random access &
read/write mixed workload". For example, by using
state backend in streaming execution, one have to mix the read and write
operations and all of them are actually random
access. But in a batch hash execution, we could divide the phases into
write and read. For example, we can build the
hash table first, with only write operations. And once the build is done,
we can start to read and trigger the user codes.
Take hash aggregation which blink planner implemented as an example, during
building phase, as long as the hash map
could fit into memory, we will update the accumulators directly in the hash
map. And once we are running out of memory,
we then fall back to sort based execution. It improves the performance a
lot if the incoming data can be processed in
memory.

Regarding #2, IIUC you are actually describing a binary format of key, not
normalized key which is used in DataSet. I will
take String for example. If we have lots of keys with length all greater
than, let's say 20. In your proposal, you will encode
the whole string in the prefix of your composed data (  + 
+  ). And when you compare
records, you will actually compare the *whole* key of the record. For
normalized key, it's fixed-length in this case, IIRC it will
take 8 bytes to represent the string. And the sorter will store the
normalized key and offset in a dedicated array. When doing
the sorting, it only sorts this *small* array. If the normalized keys are
different, you could immediately tell which is greater from
normalized keys. You only have to compare the full keys if the normalized
keys are equal and you know in this case the normalized
key couldn't represent the full key. The reason why Dataset is doing this
is it's super cache efficient by sorting the *small* array.
The idea is borrowed from this paper [1]. Let me know if I missed or
misunderstood anything.

[1] https://dl.acm.org/doi/10./615232.615237 (AlphaSort: a
cache-sensitive parallel external sort)

Best,
Kurt


On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz 
wrote:

> Hey Kurt,
>
> Thank you for comments!
>
> Ad. 1 I might have missed something here, but as far as I see it is that
> using the current execution stack with regular state backends (RocksDB
> in particular if we want to have spilling capabilities) is equivalent to
> hash-based execution. I can see a different spilling state backend
> implementation in the future, but I think it is not batch specifc. Or am
> I missing something?
>
> Ad. 2 Totally agree that normalized keys are important to the
> performance. I think though TypeComparators are not a necessity to have
> that. Actually  this proposal is heading towards only ever performing
> "normalized keys" comparison. I have not included in the proposal the
> binary format which we will use for sorting (partially because I forgot,
> and partially because I thought it was too much of an implementation
> detail). Let me include it here though, as it might clear the situation
> a bit here.
>
> In DataSet, at times we have KeySelectors which extract keys based on
> field indices or names. This allows in certain situation to extract the
> key from serialized records. Compared to DataSet, in DataStream, the key
> is always described with a black-box KeySelector, or differently with a
> function which extracts a key from a deserialized record.  In turn there
> is no way to create a comparator that could compare records by
> extracting the key from a serialized record (neither with, nor without
> key normalization). We suggest that the input for the sorter will be
>
>  +  + 
>
> Without having the key prepended we would have to deserialize the record
> for every key comparison.
>
> Therefore if we agree that we perform binary comparison for keys (which
> are always prepended), it is actually equivalent to a DataSet with
> TypeComparators that support key normalization.
>
> Let me know if that is clear, or I have missed something here.
>
> Best,
>
> Dawid
>
> On 08/09/2020 03:39, Kurt Young wrote:
> > Hi Dawid, thanks for bringing this up, it's really exciting to see that
> > batch execution is introduced in DataStream. From the flip, it seems
> > we are sticking with sort based execution mode (at least for now), which
> > will sort the whole input data before any *keyed* operation is
> > executed. I have two comments here:
> >
> > 1. Do we want to introduce hash-based execution in the future? Sort is a
> > safe choice but not the best in lots of cases. IIUC we only need
> > to make sure that before the framework finishes dealing with one key, the
> > operator doesn't see any dat

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-07 Thread Kurt Young
Hi Dawid, thanks for bringing this up, it's really exciting to see that
batch execution is introduced in DataStream. From the flip, it seems
we are sticking with sort based execution mode (at least for now), which
will sort the whole input data before any *keyed* operation is
executed. I have two comments here:

1. Do we want to introduce hash-based execution in the future? Sort is a
safe choice but not the best in lots of cases. IIUC we only need
to make sure that before the framework finishes dealing with one key, the
operator doesn't see any data belonging to other keys, thus
hash-based execution would also do the trick. Oon tricky thing the
framework might need to deal with is memory constraint and spilling
in the hash map, but Flink also has some good knowledge about these stuff.

2. Going back to sort-based execution and how to sort keys. From my
experience, the performance of sorting would be one the most important
things if we want to achieve good performance of batch execution. And
normalized keys are actually the key of the performance of sorting.
If we want to get rid of TypeComparator, I think we still need to find a
way to introduce this back.

Best,
Kurt


On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek  wrote:

> Yes, I think we can address the problem of indeterminacy in a separate
> FLIP because we're already in it.
>
> Aljoscha
>
> On 07.09.20 17:00, Dawid Wysakowicz wrote:
> > @Seth That's a very good point. I agree that RocksDB has the same
> > problem. I think we can use the same approach for the sorted shuffles
> > then. @Aljoscha I agree we should think about making it more resilient,
> > as I guess users might have problems already if they use keys with
> > non-deterministic binary representation. How do you feel about
> > addressing that separately purely to limit the scope of this FLIP?
> >
> > @Aljoscha I tend to agree with you that the best place to actually place
> > the sorting would be in the InputProcessor(s). If there are no more
> > suggestions in respect to that issue. I'll put this proposal for voting.
> >
> > @all Thank you for the feedback so far. I'd like to start a voting
> > thread on the proposal tomorrow. Therefore I'd appreciate if you comment
> > before that, if you still have some outstanding ideas.
> >
> > Best,
> >
> > Dawid
> >
> > On 04/09/2020 17:13, Aljoscha Krettek wrote:
> >> Seth is right, I was just about to write that as well. There is a
> >> problem, though, because some of our TypeSerializers are not
> >> deterministic even though we use them as if they were. Beam excludes
> >> the FloatCoder, for example, and the AvroCoder in certain cases. I'm
> >> pretty sure there is also weirdness going on in our KryoSerializer.
> >>
> >> On 04.09.20 14:59, Seth Wiesman wrote:
> >>> There is already an implicit assumption the TypeSerializer for keys is
> >>> stable/deterministic, RocksDB compares keys using their serialized byte
> >>> strings. I think this is a non-issue (or at least it's not changing the
> >>> status quo).
> >>>
> >>> On Fri, Sep 4, 2020 at 6:39 AM Timo Walther 
> wrote:
> >>>
>  +1 for getting rid of the TypeComparator interface and rely on the
>  serialized representation for grouping.
> 
>  Adding a new type to DataStream API is quite difficult at the moment
>  due
>  to too many components that are required: TypeInformation (tries to
>  deal
>  with logical fields for TypeComparators), TypeSerializer (incl. it's
>  snapshot interfaces), and TypeComparator (with many methods and
>  internals such normalized keys etc.).
> 
>  If necessary, we can add more simple comparison-related methods to the
>  TypeSerializer interface itself in the future (like
>  TypeSerializer.isDeterministic).
> 
>  Regards,
>  Timo
> 
> 
>  On 04.09.20 11:48, Aljoscha Krettek wrote:
> > Thanks for publishing the FLIP!
> >
> > On 2020/09/01 06:49:06, Dawid Wysakowicz 
> > wrote:
> >> 1. How to sort/group keys? What representation of the key
> >> should we
> >>use? Should we sort on the binary form or should we depend on
> >>Comparators being available.
> >
> > Initially, I suggested to Dawid (in private) to do the
> > sorting/grouping
>  by using the binary representation. Then my opinion switched and I
>  thought
>  we should use TypeComparator/Comparator because that's what the
>  DataSet API
>  uses. After talking to Stephan, I'm again encouraged in my opinion
>  to use
>  the binary representation because it means we can eventually get rid
>  of the
>  TypeComparator interface, which is a bit complicated, and because we
>  don't
>  need any good order in our sort, we only need the grouping.
> >
> > This comes with some problems, though: we need to ensure that the
>  TypeSerializer of the type we're sorting is stable/deterministic.
>  Beam has
>  

Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-08-28 Thread Kurt Young
A quick question, does network memory treated as managed memory now? Or in
the future?

Best,
Kurt


On Wed, Aug 26, 2020 at 5:32 PM Xintong Song  wrote:

> Hi devs,
>
> I'd like to bring the discussion over FLIP-141[1], which proposes how
> managed memory should be shared by various use cases within a slot. This is
> an extension to FLIP-53[2], where we assumed that RocksDB state backend and
> batch operators are the only use cases of managed memory for streaming and
> batch jobs respectively, which is no longer true with the introduction of
> Python UDFs.
>
> Please notice that we have not reached consensus between two different
> designs. The major part of this FLIP describes one of the candidates, while
> the alternative is discussed in the section "Rejected Alternatives". We are
> hoping to borrow intelligence from the community to help us resolve the
> disagreement.
>
> Any feedback would be appreciated.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing#FLIP141:IntraSlotManagedMemorySharing-compatibility
>
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Kurt Young
Congratulations Dian!

Best,
Kurt


On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:

> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:
>
>> Congrats!
>>
>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>>
>>> Congratulations Dian!
>>>
>>> Best,
>>> Xingbo
>>>
>>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>>
 Hi all,

 On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
 part of the Apache Flink Project Management Committee (PMC).

 Dian Fu has been very active on PyFlink component, working on various
 important features, such as the Python UDF and Pandas integration, and
 keeps checking and voting for our releases, and also has successfully
 produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
 forward the release of Flink 1.12.

 Please join me in congratulating Dian Fu for becoming a Flink PMC
 Member!

 Best,
 Jincheng(on behalf of the Flink PMC)

>>>
>
> --
> Best regards!
> Rui Li
>


Re: [VOTE] FLIP-132: Temporal Table DDL and Temporal Table Join

2020-08-24 Thread Kurt Young
+1, making concepts clear and understandable to all the developers is a
very important thing.
Thanks Leonard for driving this.

Best,
Kurt

On Tue, Aug 25, 2020 at 10:47 AM Rui Li  wrote:

> +1. Thanks Leonard for driving this.
>
> On Tue, Aug 25, 2020 at 10:10 AM Jark Wu  wrote:
>
> > Thanks Leonard!
> >
> > +1 to the FLIP.
> >
> > Best,
> > Jark
> >
> > On Tue, 25 Aug 2020 at 01:41, Fabian Hueske  wrote:
> >
> >> Leonard, Thanks for updating the FLIP!
> >>
> >> +1 to the current version.
> >>
> >> Thanks, Fabian
> >>
> >> Am Mo., 24. Aug. 2020 um 17:56 Uhr schrieb Leonard Xu <
> xbjt...@gmail.com
> >> >:
> >>
> >>> Hi all,
> >>>
> >>> I would like to start the vote for FLIP-132 [1], which has been
> >>> discussed and
> >>> reached a consensus in the discussion thread [2].
> >>>
> >>> The vote will be open until 27th August (72h), unless there is an
> >>> objection or not enough votes.
> >>>
> >>> Best,
> >>> Leonard
> >>> [1]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join
> >>>
> >>> [2]
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
> >>>
> >>>
> >>>
>
> --
> Best regards!
> Rui Li
>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-16 Thread Kurt Young
Hi Kostas,

Thanks for starting this discussion. The first part of this FLIP: "Batch vs
Streaming Scheduling" looks reasonable to me.
However, there is another dimension I think we should also take into
consideration, which is whether checkpointing is enabled.

This option is orthogonal (but not fully) to the boundedness and
persistence of the input. For example, consider an arbitrary operator
who uses state, we can enable checkpoint to achieve better failure recovery
if the input is bounded and pipelined. And if the input
is bounded and persistent, we can still use checkpointing, but we might
need to checkpoint the offset of the intermediate result set of
the operator. This would require much more work and we can defer this to
the future.

Beyond this dimension, there is another question to be asked. If the
topology is mixed with some bounded and unbounded inputs, what
would be the behavior? E.g. a join operator with one of its input bounded,
and another input unbounded. Can we still use BATCH or
STREAMING to define the schedule policy? What kind of failure recovery
guarantee Flink can provide to the users.

I don't have a clear answer for now, but just want to raise them up to seek
some discussion.

Best,
Kurt


On Wed, Aug 12, 2020 at 11:22 PM Kostas Kloudas  wrote:

> Hi all,
>
> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> API in favour of the DataStream API and the Table API. After this work
> is done, the user will be able to write a program using the DataStream
> API and this will execute efficiently on both bounded and unbounded
> data. But before we reach this point, it is worth discussing and
> agreeing on the semantics of some operations as we transition from the
> streaming world to the batch one.
>
> This thread and the associated FLIP [2] aim at discussing these issues
> as these topics are pretty important to users and can lead to
> unpleasant surprises if we do not pay attention.
>
> Let's have a healthy discussion here and I will be updating the FLIP
> accordingly.
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-08-11 Thread Kurt Young
The content length of FLIP-107 is relatively short but the scope and
implications it will cause is actually very big.
>From what I can tell now, I think there is a good chance that we can
deliver part of this FLIP in 1.12, e.g.
accessing the metadata field just like you mentioned.

Best,
Kurt


On Tue, Aug 11, 2020 at 7:18 PM Dongwon Kim  wrote:

>  Big +1 for this FLIP.
>
> Recently I'm working on some Kafka topics that have timestamps as
> metadata, not in the message body. I want to declare a table from the
> topics with DDL but "rowtime_column_name" in  seems
> to accept only existing columns.
>
> > :
> >   WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
> >
> >
> I raised an issue in user@ list but committers advise to use alternative
> approaches that call for detailed knowledge of Flink like custom decoding
> format or conversion between DataStream API and TableEnvironment. It is
> definitely against the main advantage of Flink SQL, simplicity and ease of
> use. This FLIP must be implemented IMHO in order for users to derive tables
> freely from any Kafka topic without having to involve DataStream API.
>
> Best,
>
> Dongwon
>
> On 2020/03/01 14:30:31, Dawid Wysakowicz  wrote:
> > Hi,>
> >
> > I would like to propose an improvement that would enable reading table>
> > columns from different parts of source records. Besides the main payload>
> > majority (if not all of the sources) expose additional information. It>
> > can be simply a read-only metadata such as offset, ingestion time or a>
> > read and write  parts of the record that contain data but additionally>
> > serve different purposes (partitioning, compaction etc.), e.g. key or>
> > timestamp in Kafka.>
> >
> > We should make it possible to read and write data from all of those>
> > locations. In this proposal I discuss reading partitioning data, for>
> > completeness this proposal discusses also the partitioning when writing>
> > data out.>
> >
> > I am looking forward to your comments.>
> >
> > You can access the FLIP here:>
> >
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> >
>
> >
> > Best,>
> >
> > Dawid>
> >
> >
> >
>


Re: [DISCUSS] Planning Flink 1.12

2020-08-03 Thread Kurt Young
Regarding setting the feature freeze date to late September, I have some
concern that it might make
the development time of 1.12 too short.

One reason for this is we took too much time (about 1.5 month, from mid of
May to beginning of July)
for testing 1.11. It's not ideal but further squeeze the development time
of 1.12 won't make this better.
 Besides, AFAIK July & August is also a popular vacation season for
European. Given the fact most
 committers of Flink come from Europe, I think we should also take this
into consideration.

It's also true that the first week of October is the national holiday of
China, so I'm wondering whether the
end of October could be a candidate feature freeze date.

Best,
Kurt


On Tue, Jul 28, 2020 at 2:41 AM Robert Metzger  wrote:

> Hi all,
>
> Thanks a lot for the responses so far. I've put them into this Wiki page:
> https://cwiki.apache.org/confluence/display/FLINK/1.12+Release to keep
> track of them. Ideally, post JIRA tickets for your feature, then the status
> will update automatically in the wiki :)
>
> Please keep posting features here, or add them to the Wiki yourself 
>
> @Prasanna kumar : Dynamic Auto Scaling is a
> feature request the community is well-aware of. Till has posted
> "Reactive-scaling mode" as a feature he's working on for the 1.12 release.
> This work will introduce the basic building blocks and partial support for
> the feature you are requesting.
> Proper support for dynamic scaling, while maintaining Flink's high
> performance (throughout, low latency) and correctness is a difficult task
> that needs a lot of work. It will probably take a little bit of time till
> this is fully available.
>
> Cheers,
> Robert
>
>
>
> On Thu, Jul 23, 2020 at 2:27 PM Till Rohrmann 
> wrote:
>
> > Thanks for being our release managers for the 1.12 release Dian & Robert!
> >
> > Here are some features I would like to work on for this release:
> >
> > # Features
> >
> > ## Finishing pipelined region scheduling (
> > https://issues.apache.org/jira/browse/FLINK-16430)
> > With the pipelined region scheduler we want to implement a scheduler
> which
> > can serve streaming as well as batch workloads alike while being able to
> > run jobs under constrained resources. The latter is particularly
> important
> > for bounded streaming jobs which, currently, are not well supported.
> >
> > ## Reactive-scaling mode
> > Being able to react to newly available resources and rescaling a running
> > job accordingly will make Flink's operation much easier because resources
> > can then be controlled by an external tool (e.g. GCP autoscaling, K8s
> > horizontal pod scaler, etc.). In this release we want to make a big step
> > towards this direction. As a first step we want to support the execution
> of
> > jobs with a parallelism which is lower than the specified parallelism in
> > case that Flink lost a TaskManager or could not acquire enough resources.
> >
> > # Maintenance/Stability
> >
> > ## JM / TM finished task reconciliation (
> > https://issues.apache.org/jira/browse/FLINK-17075)
> > This prevents the system from going out of sync if a task state change
> from
> > the TM to the JM is lost.
> >
> > ## Make metrics services work with Kubernetes deployments (
> > https://issues.apache.org/jira/browse/FLINK-11127)
> > Invert the direction in which the MetricFetcher connects to the
> > MetricQueryFetchers. That way it will no longer be necessary to expose on
> > K8s for every TaskManager a port on which the MetricQueryFetcher runs.
> This
> > will then make the deployment of Flink clusters on K8s easier.
> >
> > ## Handle long-blocking operations during job submission (savepoint
> > restore) (https://issues.apache.org/jira/browse/FLINK-16866)
> > Submitting a Flink job can involve the interaction with external systems
> > (blocking operations). Depending on the job the interactions can take so
> > long that it exceeds the submission timeout which reports a failure on
> the
> > client side even though the actual submission succeeded. By decoupling
> the
> > creation of the ExecutionGraph from the job submission, we can make the
> job
> > submission non-blocking which will solve this problem.
> >
> > ## Make IDs more intuitive to ease debugging (FLIP-118) (
> > https://issues.apache.org/jira/browse/FLINK-15679)
> > By making the internal Flink IDs compositional or logging how they belong
> > together, we can make the debugging of Flink's operations much easier.
> >
> > Cheers,
> > Till
> >
> >
> > On Thu, Jul 23, 2020 at 7:48 AM Canbin Zheng 
> > wrote:
> >
> > > Hi All,
> > >
> > > Thanks for bring-up this discussion, Robert!
> > > Congratulations on becoming the release manager of 1.12, Dian and
> Robert
> > !
> > >
> > > --
> > > Here are some of my thoughts of the features for native integration
> with
> > > Kubernetes in Flink 1.12:
> > >
> > > 1. Support user-specified pod templates
> > > Description:
> > > The current approach of introducing new 

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

2020-07-31 Thread Kurt Young
1. Even if there are some "Supports" interfaces that are not orthogonal
with ScanTableSource and LookupTableSource,
it doesn't mean we should encourage such usage. Such concept conflicts will
accumulate to larger issues which will
hurt us in the future.

2. Regarding to SupportsStatisticsReport, I think the interface is a bit
fuzzy. From the interface name, I was expecting that
this source will try to gather and report statistics of their own. But it
also receives some catalog statistics, what is this?
Why does the table source need to *report* statistics when there already
exists some statistics from the catalog? Would this
catalog statistics always exist?

3.
> Regarding If there are multiple Transformations in source op, and they
>r equire different parallelism. In this case, it should be left to the
> source to set the parallelism
This sounds like a contradiction to the interface you want to introduce.
I'm more confused, do you want the framework to take care
the parallelism setting for the source operator, or do you want to let the
source operator set the parallelism?

Best,
Kurt


On Fri, Jul 31, 2020 at 1:43 PM Jingsong Li  wrote:

> Hi, thanks for your responses.
>
> To Benchao:
>
> Glad to see your works and requirements, they should be Public.
>
> To Kurt:
>
> 1.Regarding "SupportsXXX" for ScanTableSource or LookupTableSource
> or DynamicTableSink, I don't think a "SupportsXXX" must work with all these
> three types. As Godfrey said, Such as a LookupTableSource should not extend
> from SupportsWatermarkPushDown and SupportsComputedColumnPushDown. We just
> try our best to make all combinations work, like
> "SupportsParallelismReport", it can work with both ScanTableSource
> and DynamicTableSink.
>
> About adding the method "reportParallelism" we want directly to
> ScanTableSource and DynamicTableSink, I think maybe most of sources/sinks
> do not want to see this method, provides a "SupportsXXX" aim to give
> connector developer a option selection.
>
> 2.Regarding SupportsStatisticsReport doesn't work for unbounded streaming
> table sources, yes, it is, the statistics (Including catalog statistics)
> are not related to stream tables, but I think, in future, we can create
> more useful statistics information for streaming tables.
>
> 3."oldStats" in SupportsStatisticsReport, "oldStats" should be named to
> "catalogStats", source just try its best to get more useful and accurate
> statistic information, but just like Godfrey said, it is a supplement to
> catalog statistics, it can just supplement missing or inaccurate
> information in the catalog.
>
> 4.Internal or Public, I am glad to see your requirements, I am OK with
> Public.
>
> To Godfrey:
>
> Regarding If there are multiple Transformations in source op, and they
> require different parallelism. In this case, it should be left to the
> source to set the parallelism. So, these should be two things that are
> orthogonal. Users who do not use multi Transformations still need to set
> parallelism.
>
> Best,
> Jingsong
>
> On Thu, Jul 30, 2020 at 8:31 PM godfrey he  wrote:
>
> > Thanks Jingsong for bringing up this discussion,
> >  and thanks Kurt for the detailed thoughts.
> >
> > First of all, I also think it's a very useful feature to expose more
> > ability for table source.
> >
> > 1) If we want to support [1], it's seem that SupportsParallelismReport
> > does not meet the requirement: If there are multiple Transformations in
> > source op,
> > and they require different parallelism.
> >
> > 2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
> > Currently, we also do not distinguish them for the existing
> "SupportsXXX".
> > Such as a LookupTableSource should not extend from
> > SupportsWatermarkPushDown
> > and SupportsComputedColumnPushDown.
> > A DynamicTableSource sub-class will extend from "SupportsXXX" only if it
> > has the capability,
> > So the unbounded table source should not extend from
> > SupportsStatisticsReport,
> > or just return unknown for unbounded if a table source can work for both
> > bounded and unbounded.
> >
> > I think SupportsStatisticsReport is a supplement to catalog statistics,
> > that means
> > only catalog statistic is unknown, SupportsStatisticsReport works.
> >
> > 3)  +1 to make them as public.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18674
> >
> > Best,
> > Godfrey
> >
> >
> >
> > Kurt Young  于2020年7月30日周四 下午4:01写道:
> >
> > > Hi Jingsong,
> > &g

Re: [DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-30 Thread Kurt Young
+1, looking forward to the follow up FLIPs.

Best,
Kurt


On Thu, Jul 30, 2020 at 6:40 PM Arvid Heise  wrote:

> +1 of getting rid of the DataSet API. Is DataStream#iterate already
> superseding DataSet iterations or would that also need to be accounted for?
>
> In general, all surviving APIs should also offer a smooth experience for
> switching back and forth.
>
> On Thu, Jul 30, 2020 at 9:39 AM Márton Balassi 
> wrote:
>
> > Hi All,
> >
> > Thanks for the write up and starting the discussion. I am in favor of
> > unifying the APIs the way described in the FLIP and deprecating the
> DataSet
> > API. I am looking forward to the detailed discussion of the changes
> > necessary.
> >
> > Best,
> > Marton
> >
> > On Wed, Jul 29, 2020 at 12:46 PM Aljoscha Krettek 
> > wrote:
> >
> >> Hi Everyone,
> >>
> >> my colleagues (in cc) and I would like to propose this FLIP for
> >> discussion. In short, we want to reduce the number of APIs that we have
> >> by deprecating the DataSet API. This is a big step for Flink, that's why
> >> I'm also cross-posting this to the User Mailing List.
> >>
> >> FLIP-131: http://s.apache.org/FLIP-131
> >>
> >> I'm posting the introduction of the FLIP below but please refer to the
> >> document linked above for the full details:
> >>
> >> --
> >> Flink provides three main SDKs/APIs for writing Dataflow Programs: Table
> >> API/SQL, the DataStream API, and the DataSet API. We believe that this
> >> is one API too many and propose to deprecate the DataSet API in favor of
> >> the Table API/SQL and the DataStream API. Of course, this is easier said
> >> than done, so in the following, we will outline why we think that having
> >> too many APIs is detrimental to the project and community. We will then
> >> describe how we can enhance the Table API/SQL and the DataStream API to
> >> subsume the DataSet API's functionality.
> >>
> >> In this FLIP, we will not describe all the technical details of how the
> >> Table API/SQL and DataStream will be enhanced. The goal is to achieve
> >> consensus on the idea of deprecating the DataSet API. There will have to
> >> be follow-up FLIPs that describe the necessary changes for the APIs that
> >> we maintain.
> >> --
> >>
> >> Please let us know if you have any concerns or comments. Also, please
> >> keep discussion to this ML thread instead of commenting in the Wiki so
> >> that we can have a consistent view of the discussion.
> >>
> >> Best,
> >> Aljoscha
> >>
> >
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

2020-07-30 Thread Kurt Young
Hi Jingsong,

Thanks for bringing up this discussion. In general, I'm +1 to enrich the
source ability by
the parallelism and stats reporting, but I'm not sure whether introducing
such "Supports"
interface is a good idea. I will share my thoughts separately.

1) Regarding the interface SupportsParallelismReport, first of all, my
feeling is that such a mechanism
is not like other abilities like SupportsProjectionPushDown. Parallelism of
source operator would be
decided anyway, the only difference here is whether it's decided purely by
framework or by table source
itself. So another angle to understand this issue is, we can always assume
a table source has the
ability to determine the parallelism. The table source can choose to set
the parallelism by itself, or delegate
it to the framework.

This might sound like personal taste, but there is another bad case if we
introduce the interface. You
may already know we currently have two major table
sources, LookupTableSource and ScanTableSource.
IIUC it won't make much sense if the user provides a LookupTableSource and
also implements
SupportsParallelismReport.

An alternative solution would be add the method you want directly
to ScanTableSource, and also have
a default implementation returning -1, which means letting framework to
decide the parallelism.

2) Regarding the interface SupportsStatisticsReport, it seems this
interface doesn't work for unbounded
streaming table sources. What kind of implementation do you expect in such
a case? And how does this
interface work with LookupTableSource?
Another question is what the oldStats parameter is used for?

3) Internal or Public. I don't think we should mark them as internal. They
are currently only used by internal
connectors doesn't mean this interface should be internal. I can imagine
there will be lots of Filesystem like
connectors outside the project which need such capability.

Best,
Kurt


On Thu, Jul 30, 2020 at 1:02 PM Benchao Li  wrote:

> Hi Jingsong,
>
> Regarding SupportsParallelismReport,
> I think the streaming connectors can also benefit from it.
> I see some requirements from user ML that they want to control
> source/sink's parallelism instead
> to set them to global parallelism.
> Also, in our compony, we did this too.
>
> Jingsong Li  于2020年7月30日周四 上午11:16写道:
>
> > Hi all,
> >
> > ## SupportsParallelismReport
> >
> > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using
> the
> > old interfaces.
> >
> > We are considering migrating to the new interface.
> >
> > However, one problem is that in the old interface implementation,
> > connectors infer parallelism by itself instead of a global parallelism
> > configuration. Hive & filesystem determines the parallelism size
> according
> > to the number of files and the size of the file. In this way, large
> tables
> > may use thousands of parallelisms, while small tables only have 10
> > parallelisms, which can minimize the consumption of task scheduling.
> >
> > This situation is very common in batch computing. For example, in the
> star
> > model, a large table needs to be joined with multiple small tables.
> >
> > So we should give this ability to new table source interfaces. The
> > interface can be:
> >
> > /**
> >  * Enables to give source the ability to report parallelism.
> >  *
> >  * After filtering push down and partition push down, the source
> > can have more information,
> >  * which can help it infer more effective parallelism.
> >  */
> > @Internal
> > public interface SupportsParallelismReport {
> >
> >/**
> > * Report parallelism from source or sink. The parallelism of an
> > operator must be at least 1,
> > * or -1 (use system default).
> > */
> >int reportParallelism();
> > }
> >
> >
> > Rejected Alternatives:
> > - SupportsSplitReport: What is the relationship between this split and
> the
> > split of FLIP-27? Do we have to match them one by one? I think they are
> two
> > independent things. In fact, the design of FLIP-27, split and parallelism
> > are not bound one by one.
> > - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> > partition is a special concept of table. It should not be mixed with
> > parallelism.
> >
> > ## SupportsStatisticsReport
> >
> > As with parallelism, statistics information from source will be more
> > appropriate and accurate. After filtering push down and partition push
> > down, the source can have more information, which can help it infer more
> > effective statistics. However, if we only infer from the planner itself,
> it
> > may lead to a big gap between the statistics information and the real
> > situation.
> >
> > The interface:
> >
> > /**
> >  * Enables to give {@link ScanTableSource} the ability to report table
> > statistics.
> >  *
> >  * Statistics can be inferred from real data in real time,  it is
> > more accurate than the
> >  * statistics in the catalog.
> >  *
> >  * After filtering push down and partition push down, the 

Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release candidate #4)

2020-07-23 Thread Kurt Young
>From my experience, java profilers are sometimes not accurate enough to
find out the performance regression
root cause. In this case, I would suggest you try out intel vtune amplifier
to watch more detailed metrics.

Best,
Kurt


On Fri, Jul 24, 2020 at 8:51 AM Thomas Weise  wrote:

> The cause of the issue is all but clear.
>
> Previously I had mentioned that there is no suspect change to the Kinesis
> connector and that I had reverted the AWS SDK change to no effect.
>
> https://issues.apache.org/jira/browse/FLINK-17496 actually fixed another
> regression in the previous release and is present before and after.
>
> I repeated the run with 1.11.0 core and downgraded the entire Kinesis
> connector to 1.10.1: Nothing changes, i.e. the regression is still present.
> Therefore we will need to look elsewhere for the root cause.
>
> Regarding the time spent in snapshotState, repeat runs reveal a wide range
> for both versions, 1.10 and 1.11. So again this is nothing pointing to a
> root cause.
>
> At this point, I have no ideas remaining other than doing a bisect to find
> the culprit. Any other suggestions?
>
> Thomas
>
>
> On Thu, Jul 16, 2020 at 9:19 PM Zhijiang  .invalid>
> wrote:
>
> > Hi Thomas,
> >
> > Thanks for your further profiling information and glad to see we already
> > finalized the location to cause the regression.
> > Actually I was also suspicious of the point of #snapshotState in previous
> > discussions since it indeed cost much time to block normal operator
> > processing.
> >
> > Based on your below feedback, the sleep time during #snapshotState might
> > be the main concern, and I also digged into the implementation of
> > FlinkKinesisProducer#snapshotState.
> > while (producer.getOutstandingRecordsCount() > 0) {
> >producer.flush();
> >try {
> >   Thread.sleep(500);
> >} catch (InterruptedException e) {
> >   LOG.warn("Flushing was interrupted.");
> >   break;
> >}
> > }
> > It seems that the sleep time is mainly affected by the internal
> operations
> > inside KinesisProducer implementation provided by amazonaws, which I am
> not
> > quite familiar with.
> > But I noticed there were two upgrades related to it in release-1.11.0.
> One
> > is for upgrading amazon-kinesis-producer to 0.14.0 [1] and another is for
> > upgrading aws-sdk-version to 1.11.754 [2].
> > You mentioned that you already reverted the SDK upgrade to verify no
> > changes. Did you also revert the [1] to verify?
> > [1] https://issues.apache.org/jira/browse/FLINK-17496
> > [2] https://issues.apache.org/jira/browse/FLINK-14881
> >
> > Best,
> > Zhijiang
> > --
> > From:Thomas Weise 
> > Send Time:2020年7月17日(星期五) 05:29
> > To:dev 
> > Cc:Zhijiang ; Stephan Ewen  >;
> > Arvid Heise ; Aljoscha Krettek  >
> > Subject:Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release
> > candidate #4)
> >
> > Sorry for the delay.
> >
> > I confirmed that the regression is due to the sink (unsurprising, since
> > another job with the same consumer, but not the producer, runs as
> > expected).
> >
> > As promised I did CPU profiling on the problematic application, which
> gives
> > more insight into the regression [1]
> >
> > The screenshots show that the average time for snapshotState increases
> from
> > ~9s to ~28s. The data also shows the increase in sleep time during
> > snapshotState.
> >
> > Does anyone, based on changes made in 1.11, have a theory why?
> >
> > I had previously looked at the changes to the Kinesis connector and also
> > reverted the SDK upgrade, which did not change the situation.
> >
> > It will likely be necessary to drill into the sink / checkpointing
> details
> > to understand the cause of the problem.
> >
> > Let me know if anyone has specific questions that I can answer from the
> > profiling results.
> >
> > Thomas
> >
> > [1]
> >
> >
> https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit?usp=sharing
> >
> > On Mon, Jul 13, 2020 at 11:14 AM Thomas Weise  wrote:
> >
> > > + dev@ for visibility
> > >
> > > I will investigate further today.
> > >
> > >
> > > On Wed, Jul 8, 2020 at 4:42 AM Aljoscha Krettek 
> > > wrote:
> > >
> > >> On 06.07.20 20:39, Stephan Ewen wrote:
> > >> >- Did sink checkpoint notifications change in a relevant way, for
> > >> example
> > >> > due to some Kafka issues we addressed in 1.11 (@Aljoscha maybe?)
> > >>
> > >> I think that's unrelated: the Kafka fixes were isolated in Kafka and
> the
> > >> one bug I discovered on the way was about the Task reaper.
> > >>
> > >>
> > >> On 07.07.20 17:51, Zhijiang wrote:
> > >> > Sorry for my misunderstood of the previous information, Thomas. I
> was
> > >> assuming that the sync checkpoint duration increased after upgrade as
> it
> > >> was mentioned before.
> > >> >
> > >> > If I remembered correctly, the memory state backend also has the
> same
> > >> issue? If so, we can dismiss the rocksDB state 

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

2020-07-22 Thread Kurt Young
Thanks for the reply, I have one more comment about the optimizer
affection. Even if you are
trying to make the cached table be as orthogonal to the optimizer as
possible by introducing
a special sink, it is still not clear why this approach is safe. Maybe you
can add some process
introduction from API to JobGraph, otherwise I can't make sure everyone
reviewing the design
doc will have the same imagination about this. And I'm also quite sure some
of the existing
mechanism will be affected by this special sink, e.g. multi sink
optimization.

Best,
Kurt


On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su  wrote:

> Hi Kurt,
>
> Thanks for the comments.
>
> 1. How do you identify the CachedTable?
> For the current design proposed in FLIP-36, we are using the first
> approach you mentioned, where the key of the map is the Cached Table java
> object. I think it is fine not to be able to identify another table
> representing the same DAG and not using the cached intermediate result
> because we want to make the caching table explicit. As mentioned in the
> FLIP, the cache API will return a Table object. And the user has to use the
> returned Table object to make use of the cached table. The rationale is
> that if the user builds the same DAG from scratch with some
> TableEnvironment instead of using the cached table object, the user
> probably doesn't want to use the cache.
>
> 2. How does the CachedTable affect the optimizer?
> We try to make the logic dealing with the cached table be as orthogonal to
> the optimizer as possible. That's why we introduce a special sink when we
> are going to cache a table and a special source when we are going to use a
> cached table. This way, we can let the optimizer does it works, and the
> logic of modifying the job graph can happen in the job graph generator. We
> can recognize the cached node with the special sink and source.
>
> 3. What's the effect of calling TableEnvironment.close()?
> We introduce the close method to prevent leaking of the cached table when
> the user is done with the table environment. Therefore, it makes more sense
> that the table environment, including all of its functionality, should not
> be used after closing. Otherwise, we should rename the close method to
> clearAllCache or something similar.
>
> And thanks for pointing out the use of not existing API used in the given
> examples. I have updated the examples in the FLIP accordingly.
>
> Best,
> Xuannan
> On Jul 16, 2020, 4:15 PM +0800, Kurt Young , wrote:
> > Hi Xuanna,
> >
> > Thanks for the detailed design doc, it described clearly how the API
> looks
> > and how to interact with Flink runtime.
> > However, the part which relates to SQL's optimizer is kind of blurry. To
> be
> > more precise, I have following questions:
> >
> > 1. How do you identify the CachedTable? I can imagine there would be map
> > representing the cache, how do you
> > compare the keys of the map? One approach is they will be compared by
> java
> > objects, which is simple but has
> > limited scope. For example, users created another table using some
> > interfaces of TableEnvironment, and the table
> > is exactly the same as the cached one, you won't be able to identify it.
> > Another choice is calculating the "signature" or
> > "diest" of the cached table, which involves string representation of the
> > whole sub tree represented by the cached table.
> > I don't think Flink currently provides such a mechanism around Table
> > though.
> >
> > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > have a dedicated QueryOperation for it, will you have
> > a dedicated logical & physical RelNode for it? And I also don't see a
> > description about how to work with current optimize phases,
> > from Operation to Calcite rel node, and then to Flink's logical and
> > physical node, which will be at last translated to Flink's exec node.
> > There also exists other optimizations such as dead lock breaker, as well
> as
> > sub plan reuse inside the optimizer, I'm not sure whether
> > the logic dealing with cached tables can be orthogonal to all of these.
> > Hence I expect you could have a more detailed description here.
> >
> > 3. What's the effect of calling TableEnvironment.close()? You already
> > explained this would drop all caches this table env has,
> > could you also explain where other functionality still works for this
> table
> > env? Like can use still create/drop tables/databases/function
> > through this table env? What happens to the catalog and all temporary
> > objects of this table env?
> >
> > One m

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

2020-07-16 Thread Kurt Young
Hi Xuanna,

Thanks for the detailed design doc, it described clearly how the API looks
and how to interact with Flink runtime.
However, the part which relates to SQL's optimizer is kind of blurry. To be
more precise, I have following questions:

1. How do you identify the CachedTable? I can imagine there would be map
representing the cache, how do you
compare the keys of the map? One approach is they will be compared by java
objects, which is simple but has
limited scope. For example, users created another table using some
interfaces of TableEnvironment, and the table
is exactly the same as the cached one, you won't be able to identify it.
Another choice is calculating the "signature" or
"diest" of the cached table, which involves string representation of the
whole sub tree represented by the cached table.
I don't think Flink currently provides such a mechanism around Table
though.

2. How does the CachedTable affect the optimizer? Specifically, will you
have a dedicated QueryOperation for it, will you have
a dedicated logical & physical RelNode for it? And I also don't see a
description about how to work with current optimize phases,
from Operation to Calcite rel node, and then to Flink's logical and
physical node, which will be at last translated to Flink's exec node.
There also exists other optimizations such as dead lock breaker, as well as
sub plan reuse inside the optimizer, I'm not sure whether
the logic dealing with cached tables can be orthogonal to all of these.
Hence I expect you could have a more detailed description here.

3. What's the effect of calling TableEnvironment.close()? You already
explained this would drop all caches this table env has,
could you also explain where other functionality still works for this table
env? Like can use still create/drop tables/databases/function
through this table env? What happens to the catalog and all temporary
objects of this table env?

One minor comment: I noticed you used some not existing API in the examples
you gave, like table.collect(), which is a little
misleading.

Best,
Kurt


On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su  wrote:

> Hi folks,
>
> I'd like to revive the discussion about FLIP-36 Support Interactive
> Programming in Flink Table API
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
> The FLIP proposes to add support for interactive programming in Flink
> Table API. Specifically, it let users cache the intermediate
> results(tables) and use them in the later jobs to avoid recomputing the
> intermediate result(tables).
>
> I am looking forward to any opinions and suggestions from the community.
>
> Best,
> Xuannan
> On May 7, 2020, 5:40 PM +0800, Xuannan Su , wrote:
> > Hi,
> >
> > There are some feedbacks from @Timo and @Kurt in the voting thread for
> FLIP-36 and I want to share my thoughts here.
> >
> > 1. How would the FLIP-36 look like after FLIP-84?
> > I don't think FLIP-84 will affect FLIP-36 from the public API
> perspective. Users can call .cache on a table object and the cached table
> will be generated whenever the table job is triggered to execute, either by
> Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
> aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
> end of the day, FLIP-36 only requires the ability to add a sink to a node,
> submit a table job with multiple sinks, and replace the cached table with a
> source.
> >
> > 2. How can we support cache in a multi-statement SQL file?
> > The most intuitive way to support cache in a multi-statement SQL file is
> by using a view, where the view is corresponding to a cached table.
> >
> > 3. Unifying the cached table and materialized views
> > It is true that the cached table and the materialized view are similar
> in some way. However, I think the materialized view is a more complex
> concept. First, a materialized view requires some kind of a refresh
> mechanism to synchronize with the table. Secondly, the life cycle of a
> materialized view is longer. The materialized view should be accessible
> even after the application exits and should be accessible by another
> application, while the cached table is only accessible in the application
> where it is created. The cached table is introduced to avoid recomputation
> of an intermediate table to support interactive programming in Flink Table
> API. And I think the materialized view needs more discussion and certainly
> deserves a whole new FLIP.
> >
> > Please let me know your thought.
> >
> > Best,
> > Xuannan
> >
> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su  wrote:
> > Hi folks,
> >
> > The FLIP-36 is updated according to the discussion with Becket. In the
> meantime, any comments are very welcome.
> >
> > If there are no further comments, I would like to start the voting
> > thread by tomorrow.
> >
> > Thanks,
> > Xuannan
> >
> >
> > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su 
> wrote:
> > > > Hi Becket,
> > > 

Re: Improvement idea: Naming the maven modules.

2020-07-15 Thread Kurt Young
+1, I also like this idea.

Best,
Kurt


On Wed, Jul 15, 2020 at 7:10 PM Niels Basjes  wrote:

> Ok,
>
> I'll put up a fix
> https://issues.apache.org/jira/browse/FLINK-18607
>
> Niels
>
> On Wed, Jul 15, 2020 at 11:23 AM Aljoscha Krettek 
> wrote:
>
> > Hi,
> >
> > I like the proposal! I remember that Beam also had more human-readable
> > names for the modules and found that helpful. Also, changing the names
> > shouldn't change anything for users because dependencies are referred to
> by
> > group/artifactId, it really just makes build output and IDE a bit more
> > parseable by humans.
> >
> > Best,
> > Aljoscha
> >
> > On Thu, Jul 9, 2020, at 16:03, Niels Basjes wrote:
> > > Yes,
> > > The tree of modules (usually on the left) shows the maven modules as a
> > tree.
> > > Yet I find that not very easy to use because of the readability of the
> > artifact names and because this becomes very wide (i.e. takes up a lot of
> > screen space).
> > > Having the maven modules with a name you'll get much better readable
> > names in the Maven page (usually on the right).
> > >
> > > Like I said: It's just an idea and in my opinion it makes certain parts
> > of the build more readable.
> > >
> > > Niels
> > >
> > >
> > > On Thu, Jul 9, 2020 at 2:55 PM Chesnay Schepler 
> > wrote:
> > >> Couldn't you just use the tree view and get basically the same thing?
> > >>
> > >> On 09/07/2020 14:51, Niels Basjes wrote:
> > >>> Attempt 2 to get you the images.
> > >>> Now as attachments.
> > >>>
> > >>> Niels
> > >>>
> > >>> On Thu, Jul 9, 2020 at 2:38 PM Chesnay Schepler 
> > wrote:
> >  The images didn't go through.
> > 
> >  On 09/07/2020 14:21, Niels Basjes wrote:
> > > Hi,
> > >
> > > I have the idea that naming the modules would make life easier for
> > the developers.
> > > My main question: Do you agree?
> > > If so I'll clean up my experiment (see below) and put up a pull
> > request.
> > >
> > > When I load the Flink code into IntelliJ I currently see this list
> > of Maven modules:
> > >
> > > image.png
> > >
> > > Now what happens is that the name of the maven module is in almost
> > all cases the name of the artifact.
> > > Only one example in the current code base that deviates from this
> > (highlighted in the screenshot).
> > > This list is shown by IntelliJ in alphabetical order.
> > >
> > > I propose to give all modules a name that should make it a lot
> > easier for the developers to find the module they are looking for.
> > > I've been playing with this idea and what I have now (local only)
> > looks like this:
> > >
> > > image.png
> > >
> > > They are still alphabetically sorted but because of the way I chose
> > the naming it becomes much easier to find the appropriate module.
> > >
> > > Also this causes the build output from Maven to become more
> readable
> > (below).
> > > First thing I noticed from this is that apparently the ordering of
> > the modules and their inter dependencies is off in certain places.
> > >
> > >
> > > [INFO] Flink : Table : API Java bridge  SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Table : API Scala .. SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Table : API Scala bridge ... SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Table : SQL Parser . SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Libraries :  SUCCESS
> > [  0.002 s]
> > > [INFO] Flink : Libraries : CEP  SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Table : Planner  SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Table : SQL Parser Hive  SUCCESS
> > [  0.002 s]
> > > [INFO] Flink : Table : Runtime Blink .. SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Table : Planner Blink .. SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Metrics : JMX .. SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Formats : .. SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Formats : Json . SUCCESS
> > [  0.004 s]
> > > [INFO] Flink : Connectors : Kafka base  SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Formats : Avro . SUCCESS
> > [  0.005 s]
> > > [INFO] Flink : Formats : Csv .. SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Connectors : Kafka 0.10  SUCCESS
> > [  0.009 s]
> > > [INFO] Flink : Connectors : Kafka 0.11  SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Connectors : Elasticsearch base  SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : Connectors : Elasticsearch 5 ... SUCCESS
> > [  0.003 s]
> > > [INFO] Flink : 

Re: [DISCUSS] Sql-client lack support for new features

2020-07-01 Thread Kurt Young
Thanks Jingsong for bringing this up discussion and sorry for the late
reply.

I'm in general +1 for #1, and want to expand the scope of #2.

First of all, I think the approach Jingsong proposed in #2 can help with
covering more
e2e use cases of SQL, which also draws a clean line between how to design
IT cases for
table API and SQL.

This is a good intent to have, but what I want to point out is it's still
not sufficient yet. I think
the ultimate problem we are facing is *designing a testing principle for
table & SQL*. I will
share some of my observations and issues I've seen:

1. The lack of SQL client's functionality is definitely the first victim of
this issue. During our
development, I don't know how many developers will take SQL client into
consideration when
they introduce new features, especially which need to be exposed through
SQL client.

2. We have a very fuzzy boundary of what kind of tests should be written in
table API or in SQL.
So we end up with two testing packages called "sql" and "table" for the
same testing purpose, with
*random tests created in each package*.

3. We don't have a guideline for developers, especially for new
contributors, about how many tests
they should write and where to put. For example, when we improve the data
type support, say we
start to support higher precision of Timestamp, we don't have a clue how
many tests we should provide.
Should we add some tests for the code generation part? should we add some
tests for the integration test?
Or is it already enough to test it with simple expression tests? We don't
have a clean answer for it.
As a result, we will be busy fixing bugs around new features we introduced,
even if the bug itself looks
very simple which *will surprise our users by the bad quality of Flink SQL
engine*.

So I think what we really need here (which is also very urgent IMO) is
*designing
some testing principles*
*for table API & SQL*. Writing SQL's IT case through sql client might be
one of them. If we can keep
improving the testing principles we have, I believe we will have a much
more reliable SQL engine in the
future, which can attract more users to the Flink community.

Best,
Kurt


On Thu, Jun 18, 2020 at 5:14 PM Jark Wu  wrote:

> +1 for #1
> I think this is what we are currently doing, that forward SQL statements to
> TableEnv#executeSql, e.g. FLINK-17113, FLINK-18059.
> But IMO the SQL CLI specific statements (EXIT, QUIT) should still stay only
> in SQL CLI.
>
> Another idea is that, the reviewer/committer should check tests are both
> added for SQL CLI and TableEnv if it is a new statement.
>
> Best,
> Jark
>
> On Thu, 18 Jun 2020 at 15:40, Rui Li  wrote:
>
> > Thanks Jingsong for bringing up the discussion. Perhaps we can do both #1
> > and #2? I like the idea that SQL Client should just forward SQL
> statements
> > to TableEnvironment. IIUC, with TableEnvironment::executeSql in place,
> most
> > statements can be forwarded. Only a very limited set of statements need
> to
> > be handled by SQL Client, like SET and EXIT.
> >
> > On Thu, Jun 18, 2020 at 3:19 PM Jingsong Li 
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion for the new features lacking support of
> > > Sql-client.
> > >
> > > I've seen the new DDL syntax that SQL client lacks support for many
> > times.
> > > For every new DDL syntax, we need to add support in sql-client. Add
> > > a corresponding SqlCommand in sql-client, otherwise this DDL is still
> > > not working in sql-client.
> > >
> > > But it looks like developers always forgot to add support in
> sql-client.
> > > Lots of DDL features just be added in parser and planner, but lack
> > > sql-client support, so users will wait for the next release. Just like:
> > > https://issues.apache.org/jira/browse/FLINK-7151
> > > https://issues.apache.org/jira/browse/FLINK-17198
> > > https://issues.apache.org/jira/browse/FLINK-15468
> > > https://issues.apache.org/jira/browse/FLINK-15175
> > >
> > > How to solve this?
> > > I think we have two options:
> > >
> > > 1. Unify the parser in sql-client and TableEnvironment truly. Really
> make
> > > sql-client have all abilities from TableEnvironment. sql-client just
> > > forward sql to TableEnvironment. Can it be?
> > > 2. A new testing framework mechanism: We can make sql-related tests
> more
> > > "up front", we can move e2e tests (I think we are doing now) and it
> cases
> > > to sql-client oriented. This may require a new testing framework
> > mechanism,
> > > for example, we can do something like hive sql testing [1] to
> > > sql-client oriented. In this way, the testing can cover more horizontal
> > and
> > > vertical and it is easy to migrate tests from other systems too. And I
> > > think, Flink's DDLs are enough stronger to support pure SQLs testing.
> > >
> > > What do you think?
> > >
> > > [1]https://github.com/apache/hive/tree/master/ql/src/test/queries
> > >
> > > Best,
> > > Jingsong
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-06-23 Thread Kurt Young
Hi Fabian,

I agree with you that implicitly letting event time to be the version of
the table will
work in most cases, but not for all. That's the reason I mentioned `PERIOD
FOR` [1]
syntax in my first email, which is already in sql standard to represent the
validity of
each row in the table.

If the event time can't be used, or multiple event time are defined, we
could still add
this syntax in the future.

What do you think?

[1]
https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
Best,
Kurt


On Tue, Jun 23, 2020 at 9:12 PM Fabian Hueske  wrote:

> Hi everyone,
>
> Every table with a primary key and an event-time attribute provides what is
> needed for an event-time temporal table join.
> I agree that, from a technical point of view, the TEMPORAL keyword is not
> required.
>
> I'm more sceptical about implicitly deriving the versioning information of
> a (temporal) table as the table's only event-time attribute.
> In the query
>
> SELECT *
> FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> WHERE o.currency = r.currency
>
> the syntax of the temporal table join does not explicitly reference the
> version of the temporal rates table.
> Hence, the system needs a way to derive the version of temporal table.
>
> Implicitly using the (only) event-time attribute of a temporal table (rates
> in the example above) to identify the right version works in most cases,
> but probably not in all.
> * What if a table has more than one event-time attribute? (TableSchema is
> designed to support multiple watermarks; queries with interval joins
> produce tables with multiple event-time attributes, ...)
> * What if the table does not have an event-time attribute in its schema but
> the version should only be provided as meta data?
>
> We could add a clause to define the version of a table, such as:
>
> CREATE TABLE rates (
>currency CHAR(3) NOT NULL PRIMARY KEY,
>rate DOUBLE,
>rowtime TIMESTAMP,
>WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE),
> VERSION (rowtime)
> WITH (...);
>
> The presence of a the VERSION clause (or whatever syntax) would explicitly
> define the version of a (temporal) table.
> It would also render the need for the TEMPORAL keyword superfluous because
> there would be another indicator that a table can be used in a temporal
> table join.
>
> I'm OK with not adding the TEMPORAL keyword, but I recommend that we think
> again about the proposed implicit definition of a table's version and how
> it might limit use in the future.
>
> Cheers,
> Fabian
>
> Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu :
>
> > I'm also +1 for not adding the TEMPORAL keyword.
> >
> > +1 to make the PRIMARY KEY semantic clear for sources.
> > From my point of view:
> >
> > 1) PRIMARY KEY on changelog souruce:
> > It means that when the changelogs (INSERT/UPDATE/DELETE) are
> materialized,
> > the materialized table should be unique on the primary key columns.
> > Flink assumes messages are in order on the primary key. Flink doesn't
> > validate/enforces the key integrity, but simply trust it (thus NOT
> > ENFORCED).
> > Flink will use the PRIMARY KEY for some optimization, e.g. use the
> PRIMARY
> > KEY to update the materilized state by key in temporal join operator.
> >
> > 2) PRIMARY KEY on insert-only source:
> > I prefer to have the same semantic to the batch source and changelog
> > source, that it implies that records are not duplicate on the primary
> key.
> > Flink just simply trust the primary key constraint, and doesn't valid it.
> > If there is duplicate primary keys with INSERT changeflag, then result of
> > Flink query might be wrong.
> >
> > If this is a TEMPORAL TABLE FUNCTION scenario, that source emits
> duplicate
> > primary keys with INSERT changeflag, when we migrate this case to
> temporal
> > table DDL,
> > I think this source should emit INSERT/UPDATE (UPSERT) messages instead
> of
> > INSERT-only messages,  e.g. a Kafka compacted topic source?
> >
> > Best,
> > Jark
> >
> >
> > On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf 
> wrote:
> >
> > > Hi everyone,
> > >
> > > I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE.
> > >
> > > Best,
> > >
> > > Konstantin
> > >
> > > On Mon, Jun 22, 2020 at 10:53 AM Kurt Young  wrote:
> > >
> > > > I agree with Timo, semantic about primary key needs more thought and
> > > > discussion, especially after FLIP-95 and FLIP-105.
> > > >
> > > > B

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-06-22 Thread Kurt Young
I agree with Timo, semantic about primary key needs more thought and
discussion, especially after FLIP-95 and FLIP-105.

Best,
Kurt


On Mon, Jun 22, 2020 at 4:45 PM Timo Walther  wrote:

> Hi Leonard,
>
> thanks for the summary.
>
> After reading all of the previous arguments and working on FLIP-95. I
> would also lean towards the conclusion of not adding the TEMPORAL keyword.
>
> After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can be
> represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The FOR
> SYSTEM_TIME AS OF t would trigger the internal materialization and
> "temporal" logic.
>
> However, we should discuss the meaning of PRIMARY KEY again in this
> case. In a TEMPORAL TABLE scenario, the source would emit duplicate
> primary keys with INSERT changeflag but at different point in time.
> Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The
> changelog semantics of FLIP-95 and FLIP-105 don't work well with a
> primary key declaration.
>
> Regards,
> Timo
>
>
> On 20.06.20 17:08, Leonard Xu wrote:
> > Hi everyone,
> >
> > Thanks for the nice discussion. I’d like to move forward the work,
> please let me simply summarize the main opinion and current divergences.
> >
> > 1. The agreements have been achieved:
> >
> > 1.1 The motivation we're discussing temporal table DDL is just for
> creating temporal table in pure SQL to replace pre-process temporal table
> in YAML/Table API for usability.
> > 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD FOR
> SYSTEM_TIME” is to make user understand easily.
> > 1.3 For append-only table, it can convert to changelog table which has
> been discussed in FLIP-105, we assume the following temporal table is comes
> from changelog (Jark, fabian, Timo).
> > 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x" instead of
> the current `LATERAL TABLE(rates(x))`  has come to an agreement(Fabian,
> Timo, Seth, Konstantin, Kurt).
> >
> > 2. The small divergence :
> >
> > About the definition syntax of the temporal table,
> >
> > CREATE [TEMPORAL] TABLE rates (
> > currency CHAR(3) NOT NULL PRIMARY KEY,
> > rate DOUBLE,
> > rowtime TIMESTAMP,
> > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > WITH (...);
> >
> > there is small divergence whether add "TEMPORAL" keyword or not.
> >
> > 2.1  one opinion is using "CREATE TEMPORAL TABLE" (Timo, Fabian, Seth),
> the main advantages are:
> > (1)"TEMPORAL" keyword is intuitive to indicate the history tracking
> semantics.
> > (2)"TEMPORAL" keyword illustrates that queries can visit the previous
> versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME" keyword.
> >
> > 2.2 the other is using "CREATE TABLE"(Kurt), the main advantages are:
> > (1)Just primary key and time attribute can track previous versions of a
> table well.
> > (2)The temporal behavior is triggered by temporal join syntax rather
> than in DDL, all Flink DDL table are dynamic table logically including
> temporal table. If we decide to use "TEMPORAL" keyword and treats changelog
> as temporal table, other tables backed queue like Kafka should also use
> "TEMPORAL" keyword.
> >
> >
> > IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows with 2.1
> may confuse users much. If we take a second to think about, for source/sink
> table which may backed queue (like kafka) or DB (like MySQL), we did not
> add any keyword in DDL to specify they are source or sinks, it works well.
> > I think temporal table is the third one,  kafka data source and DB data
> source can play as a source/sink/temporal table depends on the
> position/syntax that user put them in the query. The above rates table
> >  - can be a source table if user put it at `SELECT * FROM rates;`
> >  - can be a temporal table if user put it at `SELECT * FROM orders
> JOIN rates FOR SYSTEM_TIME AS OF orders.proctime
> >   ON orders.currency = rates.currency;`
> >  - can be sink table if user put is at `INSERT INTO rates SELECT *
> FROM …; `
> >  From these cases, we found all tables defined in Flink should be
> dynamic table logically, the source/sink/temporal role depends on the
> position/syntax in user’s query.
> >In fact we have used similar syntax for current lookup table, we
> didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and trigger the
> temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x") in query.
> >
> > So, I prefer to resolve the small divergence with “CREATE TABLE” which
> > (1) is more unified with our source/sink/temporal dynamic table
> conceptually,
> > (2) is aligned with current lookup table,
> > (3) also make users learn less keyword.
> >
> > WDYT?
> >
> > Best,
> > Leonard Xu
> >
> >
>
>


[jira] [Created] (FLINK-18224) Add document about sql client's tableau result mode

2020-06-09 Thread Kurt Young (Jira)
Kurt Young created FLINK-18224:
--

 Summary: Add document about sql client's tableau result mode
 Key: FLINK-18224
 URL: https://issues.apache.org/jira/browse/FLINK-18224
 Project: Flink
  Issue Type: Task
  Components: Documentation, Table SQL / Client
Reporter: Kurt Young
Assignee: Kurt Young
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New Flink Committer: Benchao Li

2020-06-09 Thread Kurt Young
Congratulations, Benchao!

Best,
Kurt


On Tue, Jun 9, 2020 at 2:46 PM Guanghui Zhang  wrote:

> Congratulations, Benchao !!!
>
> Leonard Xu  于2020年6月9日周二 下午2:39写道:
>
> > Congratulations, Benchao !
> >
> > Best,
> > Leonard Xu
> >
> >
> >
> > > 在 2020年6月9日,14:36,Jiayi Liao  写道:
> > >
> > > Congratulations!
> > >
> > >
> > > Best,
> > > Jiayi Liao
> > >
> > > On Tue, Jun 9, 2020 at 2:32 PM Dian Fu  wrote:
> > >
> > >> Congrats Benchao!
> > >>
> > >> Regards,
> > >> Dian
> > >>
> > >>> 在 2020年6月9日,下午2:30,Xintong Song  写道:
> > >>>
> > >>> Congratulations, Benchao~!
> > >>>
> > >>> Thank you~
> > >>>
> > >>> Xintong Song
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Jun 9, 2020 at 2:16 PM Jingsong Li 
> > >> wrote:
> > >>>
> >  Congratulations, Benchao. Well deserved!
> > 
> >  Best,
> >  Jingsong Lee
> > 
> >  On Tue, Jun 9, 2020 at 2:13 PM Forward Xu 
> > >> wrote:
> > 
> > > Congratulations, Benchao
> > >
> > > Best,
> > > Forward
> > >
> > > Jark Wu  于2020年6月9日周二 下午2:10写道:
> > >
> > >> Hi everyone,
> > >>
> > >> On behalf of the PMC, I'm very happy to announce Benchao Li as a
> new
> > > Apache
> > >> Flink committer.
> > >>
> > >> Benchao started contributing to Flink since late 2018. He is very
> >  active
> > > in
> > >> Flink SQL component,
> > >> and has also participated in many discussions, bug fixes. Over the
> > >> past
> > > few
> > >> months, he helped tremendously in answering user questions in the
> >  mailing
> > >> list.
> > >>
> > >> Please join me in congratulating Benchao for becoming a Flink
> >  committer!
> > >>
> > >> Thanks,
> > >> Jark
> > >>
> > >
> > 
> > 
> >  --
> >  Best, Jingsong Lee
> > 
> > >>
> > >>
> >
> >
>


Re: [DISCUSS] FLIP-84 Feedback Summary

2020-05-17 Thread Kurt Young
esults as a table.
> >>>>>> > > >
> >>>>>> > > > On the specifics I don't know enough but Fabians suggestions
> >>>>>> seems to
> >>>>>> > > > make sense to me.
> >>>>>> > > >
> >>>>>> > > > Aljoscha
> >>>>>> > > >
> >>>>>> > > > On 29.04.20 10:56, Fabian Hueske wrote:
> >>>>>> > > > > Hi Godfrey,
> >>>>>> > > > >
> >>>>>> > > > > Thanks for starting this discussion!
> >>>>>> > > > >
> >>>>>> > > > > In my mind, WATERMARK is a property (or constraint) of a
> >>>>>> field, just
> >>>>>> > > like
> >>>>>> > > > > PRIMARY KEY.
> >>>>>> > > > > Take this example from MySQL:
> >>>>>> > > > >
> >>>>>> > > > > mysql> CREATE TABLE people (id INT NOT NULL, name
> >>>>>> VARCHAR(128) NOT
> >>>>>> > > NULL,
> >>>>>> > > > > age INT, PRIMARY KEY (id));
> >>>>>> > > > > Query OK, 0 rows affected (0.06 sec)
> >>>>>> > > > >
> >>>>>> > > > > mysql> describe people;
> >>>>>> > > > > +---+--+--+-+-+---+
> >>>>>> > > > > | Field | Type | Null | Key | Default | Extra |
> >>>>>> > > > > +---+--+--+-+-+---+
> >>>>>> > > > > | id| int  | NO   | PRI | NULL|   |
> >>>>>> > > > > | name  | varchar(128) | NO   | | NULL|   |
> >>>>>> > > > > | age   | int  | YES  | | NULL|   |
> >>>>>> > > > > +---+--+--+-+-+---+
> >>>>>> > > > > 3 rows in set (0.01 sec)
> >>>>>> > > > >
> >>>>>> > > > > Here, PRIMARY KEY is marked in the Key column of the id
> field.
> >>>>>> > > > > We could do the same for watermarks by adding a Watermark
> >>>>>> column.
> >>>>>> > > > >
> >>>>>> > > > > Best, Fabian
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > > > Am Mi., 29. Apr. 2020 um 10:43 Uhr schrieb godfrey he <
> >>>>>> > > > godfre...@gmail.com>:
> >>>>>> > > > >
> >>>>>> > > > >> Hi everyone,
> >>>>>> > > > >>
> >>>>>> > > > >> I would like to bring up a discussion about the result type
> >>>>>> of
> >>>>>> > > describe
> >>>>>> > > > >> statement,
> >>>>>> > > > >> which is introduced in FLIP-84[1].
> >>>>>> > > > >> In previous version, we define the result type of
> `describe`
> >>>>>> > statement
> >>>>>> > > > is a
> >>>>>> > > > >> single column as following
> >>>>>> > > > >>
> >>>>>> > > > >> Statement
> >>>>>> > > > >>
> >>>>>> > > > >> Result Schema
> >>>>>> > > > >>
> >>>>>> > > > >> Result Value
> >>>>>> > > > >>
> >>>>>> > > > >> Result Kind
> >>>>>> > > > >>
> >>>>>> > > > >> Examples
> >>>>>> > > > >>
> >>>>>> > > > >> DESCRIBE xx
> >>>>>> > > > >>
> >>>>>> > > > >> field name: result
> >>>>>> > > > >>
> >>>>>> > > > >> field type: VARCHAR(n)
> >>>>>> > > &g

[jira] [Created] (FLINK-17756) Drop table/view shouldn't take affect on each other

2020-05-16 Thread Kurt Young (Jira)
Kurt Young created FLINK-17756:
--

 Summary: Drop table/view shouldn't take affect on each other
 Key: FLINK-17756
 URL: https://issues.apache.org/jira/browse/FLINK-17756
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Kurt Young
Assignee: Kurt Young
 Fix For: 1.11.0


Currently "DROP VIEW" can successfully drop a table, and "DROP TABLE" can 
successfully a view. We should disable this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17749) Remove fromTableSource method from TableEnvironment

2020-05-15 Thread Kurt Young (Jira)
Kurt Young created FLINK-17749:
--

 Summary: Remove fromTableSource method from TableEnvironment 
 Key: FLINK-17749
 URL: https://issues.apache.org/jira/browse/FLINK-17749
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Kurt Young
Assignee: Kurt Young
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17748) Remove registration of TableSource/TableSink in Table Env

2020-05-15 Thread Kurt Young (Jira)
Kurt Young created FLINK-17748:
--

 Summary: Remove registration of TableSource/TableSink in Table Env 
 Key: FLINK-17748
 URL: https://issues.apache.org/jira/browse/FLINK-17748
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Kurt Young
Assignee: Zhenghua Gao
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-13 Thread Kurt Young
Thanks for sharing your opinion. I can see there are some very small
divergences we had through your description. I think it would be a good
idea to first discuss these first.

Let's first put aside table version for now, and only discuss about whether
a DDL table should be treated as a DMBS style table to Flink or as
a changelog of such table. I would say both, but the later one will be the
majority case.

IMO this slight difference has not been distinguished clear enough in the
past. The biggest reason behind this is we only supported "append-only"
table as source in the past. If we take a second and think about the
append-only table, you will find out there is not much differences between
"treating it as a DMBS style table" and "treating it as a table's
changelog", Because no matter you see it from any of these two angles, you
will
see exactly the same thing. That's why we don't need to distinguish them
clearly and most of the things are just worked fine.

Things have been changed since we introducd FLIP-95 and FLIP-105. With
these two FLIPs, we are able to interpret the binlog like messages from
the source, and start to emit append / update / delete messages from
source. I would say the balance has been leaned to the changelog side
of these two angles we faced. It doesn't make much sense that a DBMS's
table itself is having some kind of update and delete messages.

Although the balance has been broke, but the two situations still exist.
Because some kind of tables are still DBMS's style table, e.g, table from
MySQL,
table from HBase. I don't have a strong opinion about how to distinguish
them, TEMPORAL keyword seems fine to me. But if we introduce this keyword,
we need to have a decision about whether an unbounded queue backed table
is TEMPORAL or not. IMO, it is a TEMPORAL table, because this
looks more like a changelog than a DBMS table to me. If this is the case,
I'm afraid that we need to put this keyword to most of the tables user has
been declared, e.g. almost all kafka tables.

But before we really decide what we should do now, I'm also curious to hear
about your opinion about the small divergence I described above.

Best,
Kurt


On Thu, May 14, 2020 at 1:27 AM Fabian Hueske  wrote:

> I think Flink should behave similar to other DBMSs.
>
> Other DBMS do not allow to query the history of a table, even though the
> DBMS has seen all changes of the table (as transactions or directly as a
> changelog if the table was replicated) and recorded them in its log.
> You need to declare a table as TEMPORAL to be able to look up previous
> versions.
>
> Flink is in a very similar situation.
> Even though we've see the physical data of all changes and could also
> store it for some time, I think we should only allow queries against
> previous versions of a table (with FOR SYSTEM_TIME AS OF) if the table was
> defined as TEMPORAL.
>
> IMO this is not about having the data to return a previous version of a
> table (other DBMS have the data as well), it's whether the user should tell
> the system to allow access to the table's history or not.
> As I said before, we could of course declare that all tables are
> automatically temporal and versioned on the only event-time attribute (what
> if there would be more than one?), but I personally don't like such
> implicit conventions.
> I don't have a concrete proposal for a syntax to declare the version
> attribute of a table, but I agree that the "PERIOD FOR SYSTEM_TIME" syntax
> doesn't look very suitable for our purposes.
> I'm sure we can come up with a better syntax for this.
>
> Best, Fabian
>
> Am Sa., 9. Mai 2020 um 03:57 Uhr schrieb Kurt Young :
>
>> All tables being described by Flink's DDL are dynamic tables. But dynamic
>> table is more like a logical concept, but not physical things.
>> Physically, dynamic table has two different forms, one is a materialized
>> table which changes over time (e.g. Database table, HBase table),
>> another form is stream which represents change logs, and they are
>> typically stored in message queue (e.g, Kafka). For the later one, I think
>> the records already representing the history of the dynamic table based
>> on stream-table duality.
>>
>> So regarding to:
>> > Of course we could define that Flink implicitly tracks the (recent,
>> i.e., within watermark bounds) history of all dynamic tables.
>> I don't think this is Flink implicitly tracking the history of the
>> dynamic table, but the physical data of the table is already the history
>> itself. What Flink
>> did is read the history out, and organize them to be prepared for further
>> operations.
>>
>> I agree with another implicit convention I took though, which treats the
>> event time as the version of the dyn

[jira] [Created] (FLINK-17635) Add documentation about view support

2020-05-12 Thread Kurt Young (Jira)
Kurt Young created FLINK-17635:
--

 Summary: Add documentation about view support 
 Key: FLINK-17635
 URL: https://issues.apache.org/jira/browse/FLINK-17635
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Kurt Young
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17599) Update documents due to FLIP-84

2020-05-10 Thread Kurt Young (Jira)
Kurt Young created FLINK-17599:
--

 Summary: Update documents due to FLIP-84
 Key: FLINK-17599
 URL: https://issues.apache.org/jira/browse/FLINK-17599
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Kurt Young






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-08 Thread Kurt Young
All tables being described by Flink's DDL are dynamic tables. But dynamic
table is more like a logical concept, but not physical things.
Physically, dynamic table has two different forms, one is a materialized
table which changes over time (e.g. Database table, HBase table),
another form is stream which represents change logs, and they are typically
stored in message queue (e.g, Kafka). For the later one, I think
the records already representing the history of the dynamic table based on
stream-table duality.

So regarding to:
> Of course we could define that Flink implicitly tracks the (recent, i.e.,
within watermark bounds) history of all dynamic tables.
I don't think this is Flink implicitly tracking the history of the dynamic
table, but the physical data of the table is already the history itself.
What Flink
did is read the history out, and organize them to be prepared for further
operations.

I agree with another implicit convention I took though, which treats the
event time as the version of the dynamic table. Strictly speaking,
we should use another syntax "PERIOD FOR SYSTEM_TIME" [1] to indicate the
version of the table. I've been thinking about this for quite a bit,
it turns out that this semantic is too similar with Flink's event time. It
will cause more trouble for users to understand what does this mean if
we treat event time and this "PERIOD FOR SYSTEM_TIME" differently. And I'm
also afraid that we will introduce lots of bugs because not all
the developers will understand this easily.
[1]
https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15

Best,
Kurt


On Sat, May 9, 2020 at 5:32 AM Fabian Hueske  wrote:

> I think we need the TEMPORAL TABLE syntax because they are conceptually
> more than just regular tables.
> In a addition to being a table that always holds the latest values (and
> can thereby serve as input to a continuous query), the system also needs to
> track the history of such a table to be able to serve different versions of
> the table (as requested by FOR SYSTEM_TIME AS OF).
>
> Of course we could define that Flink implicitly tracks the (recent, i.e.,
> within watermark bounds) history of all dynamic tables.
> However, there's one more thing the system needs to know to be able to
> correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to
> use as version of the temporal table.
> IMO it would be good to make this explicit, especially if there is a plan
> to eventually support support multiple event-time attributes / watermarks
> on a table.
> Just using the only event time attribute would be a bit too much
> convention magic for my taste (others might of course have a different
> opinion on this subject).
>
> So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE
> statement if we agree on a few implicit conventions (implicit history table
> + implicit versioning attribute).
> I'm not a big fan of such conventions and think it's better to make such
> things explicit.
>
> For temporal joins with processing time semantics, we can use regular
> dynamic tables without declaring them as TEMPORAL since we don't need a
> history table to derive the current version.
> AFAIK, these are already the semantics we use for LookupTableSource.
>
> Regarding the question of append-only tables and temporal tables, I'd like
> to share some more thoughts.
> As I said above, a temporal table consists of a regular dynamic table A
> that holds the latest version and a table H that holds the history of A.
> 1) When defining a temporal table based on a regular dynamic table (with a
> primary key), we provide A and the Flink automatically maintains H (bounded
> by watermarks)
> 2) When defining a temporal table based on an append-only table, Flink
> ingests H and we use the temporal table function to turn it into a dynamic
> table with a primary key, i.e., into A. This conversion could also be done
> during ingestion by treating the append-only stream as an upsert changelog
> and converting it into a dynamic table with PK and as Table A (just in case
> 1).
>
> As Jark said "converting append-only table into changelog table" was moved
> to future work.
> Until then, we could only define TEMPORAL TABLE on a table that is derived
> from a proper changelog stream with a specific encoding.
> The TEMPORAL VIEW would be a shortcut which would allow us to perform the
> conversion in Flink SQL (and not within the connector) and defining the
> temporal properties on the result of the view.
>
> Cheers,
> Fabian
>
>
>
> Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young :
>
>> I might missed something but why we need a new "TEMPORAL TABLE" syntax?
>>
>> According to Fabian's first mail:
>>

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Kurt Young
+dev 

Best,
Kurt


On Fri, May 8, 2020 at 3:35 PM Caizhi Weng  wrote:

> Hi Jeff,
>
> Thanks for the response. However I'm using executeAsync so that I can run
> the job asynchronously and get a JobClient to monitor the job. JobListener
> only works for synchronous execute method. Is there other way to achieve
> this?
>
> Jeff Zhang  于2020年5月8日周五 下午3:29写道:
>
>> I use JobListener#onJobExecuted to be notified that the flink job is
>> done.
>> It is pretty reliable for me, the only exception is the client process is
>> down.
>>
>> BTW, the reason you see ApplicationNotFound exception is that yarn app
>> is terminated which means the flink cluster is shutdown. While for
>> standalone mode, the flink cluster is always up.
>>
>>
>> Caizhi Weng  于2020年5月8日周五 下午2:47写道:
>>
>>> Hi dear Flink community,
>>>
>>> I would like to determine whether a job has finished (no matter
>>> successfully or exceptionally) in my code.
>>>
>>> I used to think that JobClient#getJobStatus is a good idea, but I found
>>> that it behaves quite differently under different executing environments.
>>> For example, under a standalone session cluster it will return the FINISHED
>>> status for a finished job, while under a yarn per job cluster it will throw
>>> a ApplicationNotFound exception. I'm afraid that there might be other
>>> behaviors for other environments.
>>>
>>> So what's the best practice to determine whether a job has finished or
>>> not? Note that I'm not waiting for the job to finish. If the job hasn't
>>> finished I would like to know it and do something else.
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-08 Thread Kurt Young
I might missed something but why we need a new "TEMPORAL TABLE" syntax?

According to Fabian's first mail:

> Hence, the requirements for a temporal table are:
> * The temporal table has a primary key / unique attribute
> * The temporal table has a time-attribute that defines the start of the
> validity interval of a row (processing time or event time)
> * The system knows that the history of the table is tracked and can infer
> how to look up a version.

I think primary key plus proper event time attribute is already sufficient.
So a join query looks like:

"Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id =
Dim.id"

would means for every record belong to Fact, use Fact.some_event_time as
Dim's version (which
will only keep all records from Dim table with event time less or equal
to Fact.some_event_time, and
keep only one record for each primary key).

The temporal behavior is actually triggered by the join syntax "FOR
SYSTEM_TIME AS OF Fact.some_event_time"
but not the DDL description.

Best,
Kurt


On Fri, May 8, 2020 at 10:51 AM Jark Wu  wrote:

> Hi,
>
> I agree what Fabian said above.
> Besides, IMO, (3) is in a lower priority and will involve much more things.
> It makes sense to me to do it in two-phase.
>
> Regarding to (3), the key point to convert an append-only table into
> changelog table is that the framework should know the operation type,
> so we introduced a special CREATE VIEW syntax to do it in the documentation
> [1]. Here is an example:
>
> -- my_binlog table is registered as an append-only table
> CREATE TABLE my_binlog (
>   before ROW<...>,
>   after ROW<...>,
>   op STRING,
>   op_ms TIMESTAMP(3)
> ) WITH (
>   'connector.type' = 'kafka',
>   ...
> );
>
> -- interpret my_binlog as a changelog on the op_type and id key
> CREATE VIEW my_table AS
>   SELECT
> after.*
>   FROM my_binlog
>   CHANGELOG OPERATION BY op
>   UPDATE KEY BY (id);
>
> -- my_table will materialize the insert/delete/update changes
> -- if we have 4 records in dbz that
> -- a create for 1004
> -- an update for 1004
> -- a create for 1005
> -- a delete for 1004
> > SELECT COUNT(*) FROM my_table;
> +---+
> |  COUNT(*) |
> +---+
> | 1 |
> +---+
>
> Best,
> Jark
>
> [1]:
>
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
>
>
> On Fri, 8 May 2020 at 00:24, Fabian Hueske  wrote:
>
> > Thanks for the summary Konstantin.
> > I think you got all points right.
> >
> > IMO, the way forward would be to work on a FLIP to define
> > * the concept of temporal tables,
> > * how to feed them from retraction tables
> > * how to feed them from append-only tables
> > * their specification with CREATE TEMPORAL TABLE,
> > * how to use temporal tables in temporal table joins
> > * how (if at all) to use temporal tables in other types of queries
> >
> > We would keep the LATERAL TABLE syntax because it used for regular
> > table-valued functions.
> > However, we would probably remove the TemporalTableFunction (which is a
> > built-in table-valued function) after we deprecated it for a while.
> >
> > Cheers, Fabian
> >
> > Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf <
> > kna...@apache.org>:
> >
> >> Hi everyone,
> >>
> >> Thanks everyone for joining the discussion on this. Please let me
> >> summarize
> >> what I have understood so far.
> >>
> >> 1) For joining an append-only table and a temporal table the syntax the
> >> "FOR
> >> SYSTEM_TIME AS OF " seems to be preferred (Fabian, Timo,
> >> Seth).
> >>
> >> 2) To define a temporal table based on a changelog stream from an
> external
> >> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be
> used.
> >> 3) In order to also support temporal tables derived from an append-only
> >> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian)
> >> or
> >> need to have a way to convert an append-only table into a changelog
> table
> >> (briefly discussed in [1]). It is not completely clear to me how a
> >> temporal
> >> table based on an append-only table would be with the syntax proposed in
> >> [1] and 2). @Jark Wu  could you elaborate a bit on
> >> that?
> >>
> >> How do we move forward with this?
> >>
> >> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense.
> >> What do you think? * If we proceed like this, what would this mean for
> the
> >> current syntax of LATERAL TABLE? Would we keep it? Would we eventually
> >> deprecate and drop it? Since only after 3) we would be on par with the
> >> current temporal table function join, I assume, we could only drop it
> >> thereafter.
> >>
> >> Thanks, Konstantin
> >>
> >> [1]
> >>
> >>
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6
> >>
> >>
> >> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu  wrote:
> >>
> >> > Hi Fabian,
> >> >
> >> > Just to clarify a little bit, we decided to move the "converting
> >> > 

  1   2   3   4   5   >