Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-09 Thread Timo Walther
convergence for

similar problems discussed as part of this FLIP, specifically

periodic

backfill, bootstrap + nearline update use cases using single
implementation
of business logic (single script).

Few clarifying questions:

1. In the proposed FLIP, given the example for the dynamic

table, do the

data sources always come from a single lake storage such as

Paimon or

does

the same proposal solve for 2 disparate storage systems like

Kafka and

Iceberg where Kafka events are ETLed to Iceberg similar to

Paimon?

Basically the lambda architecture that is mentioned in the

FLIP as well.

I'm wondering if it is possible to switch b/w sources based on

the

execution mode, for eg: if it is backfill operation, switch to

a data

lake

storage system like Iceberg, otherwise an event streaming

system like

Kafka.
2. What happens in the context of a bootstrap (batch) +

nearline update

(streaming) case that are stateful applications? What I mean

by that is,

will the state from the batch application be transferred to

the nearline

application after the bootstrap execution is complete?

Regards
Venkata krishnan


On Mon, Mar 25, 2024 at 8:03 PM Ron liu 

wrote:



Hi, Timo

Thanks for your quick response, and your suggestion.

Yes, this discussion has turned into confirming whether

it's a special

table or a special MV.

1. The key problem with MVs is that they don't support

modification,

so

I

prefer it to be a special table. Although the periodic

refresh

behavior

is

more characteristic of an MV, since we are already a

special table,

supporting periodic refresh behavior is quite natural,

similar to

Snowflake

dynamic tables.

2. Regarding the keyword UPDATING, since the current

Regular Table is

a

Dynamic Table, which implies support for updating through

Continuous

Query,

I think it is redundant to add the keyword UPDATING. In

addition,

UPDATING

can not reflect the Continuous Query part, can not express

the purpose

we

want to simplify the data pipeline through Dynamic Table +

Continuous

Query.

3. From the perspective of the SQL standard definition, I

can

understand

your concerns about Derived Table, but is it possible to

make a slight

adjustment to meet our needs? Additionally, as Lincoln

mentioned, the

Google Looker platform has introduced Persistent Derived

Table, and

there

are precedents in the industry; could Derived Table be a

candidate?


Of course, look forward to your better suggestions.

Best,
Ron



Timo Walther  于2024年3月25日周一 18:49写道:


After thinking about this more, this discussion boils

down to

whether

this is a special table or a special materialized

view. In both

cases,

we would need to add a special keyword:

Either

CREATE UPDATING TABLE

or

CREATE UPDATING MATERIALIZED VIEW

I still feel that the periodic refreshing behavior is

closer to a

MV.

If

we add a special keyword to MV, the optimizer would

know that the

data

cannot be used for query optimizations.

I will ask more people for their opinion.

Regards,
Timo


On 25.03.24 10:45, Timo Walther wrote:

Hi Ron and Lincoln,

thanks for the quick response and the very

insightful discussion.



we might limit future opportunities to

optimize queries

through automatic materialization rewriting by

allowing data

modifications, thus losing the potential for

such

optimizations.


This argument makes a lot of sense to me. Due to

the updates, the

system

is not in full control of the persisted data.

However, the system

is

still in full control of the job that powers the

refresh. So if

the

system manages all updating pipelines, it could

still leverage

automatic

materialization rewriting but without leveraging

the data at rest

(only

the data in flight).


we are considering another candidate, Derived

Table, the term

'derive'

suggests a query, and 'table' retains

modifiability. This

approach

would not disrupt our current concept of a

dynamic table


I did some research on this term. The SQL standard

uses the term

"derived table" extensively (defined in section

4.17.3). Thus, a

lot of

vendors adopt this for simply referring to a table

within a

subclause:










https://urldefense.com/v3/__https://dev.mysql.com/doc/refman/8.0/en/derived-tables.html__;!!IKRxdwAv5BmarQ!dVYcp9PUyjpBGzkYFxb2sdnmB0E22koc-YLdxY2LidExEHUJKRkyvRbAveqjlYFKWevFvmE1Z-j735ghdiMp$












https://urldefense.com/v3/__https://infocenter.sybase.com/help/topic/com.sybase.infocenter.dc32300.1600/doc/html/san1390612291252.html__;!!IKRxdwAv5BmarQ!dVYcp9PUyjpBGzkYFxb2sdnmB0E22koc-YLdxY2LidExEHUJKRkyvRbAveqjlYFKWevFvmE1Z-j737h1gRux$












https://urldefense.com/v3/__https://www.c-sharpcorner.com/article/derived-tables-vs-common-table-expressions/__;!!IKRxdwAv5BmarQ!dVYcp9PUyjpBGzkYFxb2sdnmB0E22koc-YLdxY2LidExEHUJKRkyvRbAveqjlYFKWevFvmE1Z-j739bWIEcL$












https://urldefense.com/v3/__https://stackoverflow.com/questions/26529804/what-are-the-derived-tables-in-my-explain-statement__;!

[jira] [Created] (FLINK-35001) Avoid scientific notation for DOUBLE to STRING

2024-04-03 Thread Timo Walther (Jira)
Timo Walther created FLINK-35001:


 Summary: Avoid scientific notation for DOUBLE to STRING
 Key: FLINK-35001
 URL: https://issues.apache.org/jira/browse/FLINK-35001
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Timo Walther


Flink currently uses Java semantics for some casts.

When executing:

{code}
SELECT CAST(CAST('19586232024.0' AS DOUBLE) AS STRING);
{code}

Leads to
{code}
1.9586232024E10
{code}

However, other vendors such as Postgres or MySQL return {{19586232024}}.

We should reconsider this behavior for consistency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-02 Thread Timo Walther

+1 (binding)

Thanks,
Timo

On 29.03.24 17:30, Hao Li wrote:

Hi devs,

I'd like to start a vote on the FLIP-437: Support ML Models in Flink
SQL [1]. The discussion thread is here [2].

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL

[2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn

Thanks,
Hao





Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-28 Thread Timo Walther

Hi everyone,

I updated the FLIP according to this discussion.

@Hao Li: Let me know if I made a mistake somewhere. I added some 
additional explaning comments about the new PTF syntax.


There are no further objections from my side. If nobody objects, Hao 
feel free to start the voting tomorrow.


Regards,
Timo


On 28.03.24 16:30, Jark Wu wrote:

Thanks, Hao,

Sounds good to me.

Best,
Jark

On Thu, 28 Mar 2024 at 01:02, Hao Li  wrote:


Hi Jark,

I think we can start with supporting popular model providers such as
openai, azureml, sagemaker for remote models.

Thanks,
Hao

On Tue, Mar 26, 2024 at 8:15 PM Jark Wu  wrote:


Thanks for the PoC and updating,

The final syntax looks good to me, at least it is a nice and concise

first

step.

SELECT f1, f2, label FROM
ML_PREDICT(
  input => `my_data`,
  model => `my_cat`.`my_db`.`classifier_model`,
  args => DESCRIPTOR(f1, f2));

Besides, what built-in models will we support in the FLIP? This might be
important
because it relates to what use cases can run with the new Flink version

out

of the box.

Best,
Jark

On Wed, 27 Mar 2024 at 01:10, Hao Li  wrote:


Hi Timo,

Yeah. For `primary key` and `from table(...)` those are explicitly

matched

in parser: [1].


SELECT f1, f2, label FROM

ML_PREDICT(
  input => `my_data`,
  model => `my_cat`.`my_db`.`classifier_model`,
  args => DESCRIPTOR(f1, f2));

This named argument syntax looks good to me. It can be supported

together

with

SELECT f1, f2, label FROM ML_PREDICT(`my_data`,
`my_cat`.`my_db`.`classifier_model`,DESCRIPTOR(f1, f2));

Sure. Will let you know once updated the FLIP.

[1]





https://github.com/confluentinc/flink/blob/release-1.18-confluent/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl#L814


Thanks,
Hao

On Tue, Mar 26, 2024 at 4:15 AM Timo Walther 

wrote:



Hi Hao,

  > `TABLE(my_data)` and `MODEL(my_cat.my_db.classifier_model)`

doesn't

  > work since `TABLE` and `MODEL` are already key words

This argument doesn't count. The parser supports introducing keywords
that are still non-reserved. For example, this enables using "key"

for

both primary key and a column name:

CREATE TABLE t (i INT PRIMARY KEY NOT ENFORCED)
WITH ('connector' = 'datagen');

SELECT i AS key FROM t;

I'm sure we will introduce `TABLE(my_data)` eventually as this is

what

the standard dictates. But for now, let's use the most compact syntax
possible which is also in sync with Oracle.

TLDR: We allow identifiers as arguments for PTFs which are expanded

with

catalog and database if necessary. Those identifier arguments

translate

to catalog lookups for table and models. The ML_ functions will make
sure that the arguments are of correct type model or table.

SELECT f1, f2, label FROM
ML_PREDICT(
  input => `my_data`,
  model => `my_cat`.`my_db`.`classifier_model`,
  args => DESCRIPTOR(f1, f2));

So this will allow us to also use in the future:

SELECT * FROM poly_func(table1);

Same support as Oracle [1]. Very concise.

Let me know when you updated the FLIP for a final review before

voting.


Do others have additional objections?

Regards,
Timo

[1]







https://livesql.oracle.com/apex/livesql/file/content_HQK7TYEO0NHSJCDY3LN2ERDV6.html




On 25.03.24 23:40, Hao Li wrote:

Hi Timo,


Please double check if this is implementable with the current

stack. I

fear the parser or validator might not like the "identifier"

argument?


I checked this, currently the validator throws an exception trying

to

get

the full qualifier name for `classifier_model`. But since
`SqlValidatorImpl` is implemented in Flink, we should be able to

fix

this.

The only caveator is if not full model path is provided,
the qualifier is interpreted as a column. We should be able to

special

handle this by rewriting the `ml_predict` function to add the

catalog

and

database name in `FlinkCalciteSqlValidator` though.


SELECT f1, f2, label FROM

 ML_PREDICT(
   TABLE `my_data`,
   my_cat.my_db.classifier_model,
   DESCRIPTOR(f1, f2))

SELECT f1, f2, label FROM
 ML_PREDICT(
   input => TABLE `my_data`,
   model => my_cat.my_db.classifier_model,
   args => DESCRIPTOR(f1, f2))

I verified these can be parsed. The problem is in validator for

qualifier

as mentioned above.


So the safest option would be the long-term solution:


SELECT f1, f2, label FROM
 ML_PREDICT(
   input => TABLE(my_data),
   model => MODEL(my_cat.my_db.classifier_model),
   args => DESCRIPTOR(f1, f2))

`TABLE(my_data)` and `MODEL(my_cat.my_db.classifier_model)` doesn't

work

since `TABLE` and `MODEL` are already key words in calcite used by

`CREATE

TABLE`, `CREATE MODEL`. Changing to `model_name(...)` works and

will

be

treated as a function.

So I think

SELECT f1, f2, label FROM
 ML_PREDICT(
   input => TABLE `my_data`,
   model => my_cat.my_d

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-26 Thread Timo Walther

Hi Hao,

> `TABLE(my_data)` and `MODEL(my_cat.my_db.classifier_model)` doesn't
> work since `TABLE` and `MODEL` are already key words

This argument doesn't count. The parser supports introducing keywords 
that are still non-reserved. For example, this enables using "key" for 
both primary key and a column name:


CREATE TABLE t (i INT PRIMARY KEY NOT ENFORCED)
WITH ('connector' = 'datagen');

SELECT i AS key FROM t;

I'm sure we will introduce `TABLE(my_data)` eventually as this is what 
the standard dictates. But for now, let's use the most compact syntax 
possible which is also in sync with Oracle.


TLDR: We allow identifiers as arguments for PTFs which are expanded with 
catalog and database if necessary. Those identifier arguments translate 
to catalog lookups for table and models. The ML_ functions will make 
sure that the arguments are of correct type model or table.


SELECT f1, f2, label FROM
  ML_PREDICT(
input => `my_data`,
model => `my_cat`.`my_db`.`classifier_model`,
args => DESCRIPTOR(f1, f2));

So this will allow us to also use in the future:

SELECT * FROM poly_func(table1);

Same support as Oracle [1]. Very concise.

Let me know when you updated the FLIP for a final review before voting.

Do others have additional objections?

Regards,
Timo

[1] 
https://livesql.oracle.com/apex/livesql/file/content_HQK7TYEO0NHSJCDY3LN2ERDV6.html




On 25.03.24 23:40, Hao Li wrote:

Hi Timo,


Please double check if this is implementable with the current stack. I

fear the parser or validator might not like the "identifier" argument?

I checked this, currently the validator throws an exception trying to get
the full qualifier name for `classifier_model`. But since
`SqlValidatorImpl` is implemented in Flink, we should be able to fix this.
The only caveator is if not full model path is provided,
the qualifier is interpreted as a column. We should be able to special
handle this by rewriting the `ml_predict` function to add the catalog and
database name in `FlinkCalciteSqlValidator` though.


SELECT f1, f2, label FROM

ML_PREDICT(
  TABLE `my_data`,
  my_cat.my_db.classifier_model,
  DESCRIPTOR(f1, f2))

SELECT f1, f2, label FROM
ML_PREDICT(
  input => TABLE `my_data`,
  model => my_cat.my_db.classifier_model,
  args => DESCRIPTOR(f1, f2))

I verified these can be parsed. The problem is in validator for qualifier
as mentioned above.


So the safest option would be the long-term solution:


SELECT f1, f2, label FROM
ML_PREDICT(
  input => TABLE(my_data),
  model => MODEL(my_cat.my_db.classifier_model),
  args => DESCRIPTOR(f1, f2))

`TABLE(my_data)` and `MODEL(my_cat.my_db.classifier_model)` doesn't work
since `TABLE` and `MODEL` are already key words in calcite used by `CREATE
TABLE`, `CREATE MODEL`. Changing to `model_name(...)` works and will be
treated as a function.

So I think

SELECT f1, f2, label FROM
ML_PREDICT(
  input => TABLE `my_data`,
  model => my_cat.my_db.classifier_model,
  args => DESCRIPTOR(f1, f2))
should be fine for now.

For the syntax part:
1). Sounds good. We can drop model task and model kind from the definition.
They can be deduced from the options.

2). Sure. We can add temporary model

3). Make sense. We can use `show create model ` to display all
information and `describe model ` to show input/output schema

Thanks,
Hao

On Mon, Mar 25, 2024 at 3:21 PM Hao Li  wrote:


Hi Ahmed,

Looks like the feature freeze time for 1.20 release is June 15th. We can
definitely get the model DDL into 1.20. For predict and evaluate functions,
if we can't get into the 1.20 release, we can get them into the 1.21
release for sure.

Thanks,
Hao



On Mon, Mar 25, 2024 at 1:25 AM Timo Walther  wrote:


Hi Jark and Hao,

thanks for the information, Jark! Great that the Calcite community
already fixed the problem for us. +1 to adopt the simplified syntax
asap. Maybe even before we upgrade Calcite (i.e. copy over classes), if
upgrading Calcite is too much work right now?

  > Is `DESCRIPTOR` a must in the syntax?

Yes, we should still stick to the standard as much as possible and all
vendors use DESCRIPTOR/COLUMNS for distinuishing columns vs. literal
arguments. So the final syntax of this discussion would be:


SELECT f1, f2, label FROM
ML_PREDICT(TABLE `my_data`, `classifier_model`, DESCRIPTOR(f1, f2))

SELECT * FROM
ML_EVALUATE(TABLE `eval_data`, `classifier_model`, DESCRIPTOR(f1, f2))

Please double check if this is implementable with the current stack. I
fear the parser or validator might not like the "identifier" argument?

Make sure that also these variations are supported:

SELECT f1, f2, label FROM
ML_PREDICT(
  TABLE `my_data`,
  my_cat.my_db.classifier_model,
  DESCRIPTOR(f1, f2))

SELECT f1, f2, label FROM
ML_PREDICT(
  input => TABLE `my_data`,
  model => my_cat.my_db.classifier_model,
  arg

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-25 Thread Timo Walther
After thinking about this more, this discussion boils down to whether 
this is a special table or a special materialized view. In both cases, 
we would need to add a special keyword:


Either

CREATE UPDATING TABLE

or

CREATE UPDATING MATERIALIZED VIEW

I still feel that the periodic refreshing behavior is closer to a MV. If 
we add a special keyword to MV, the optimizer would know that the data 
cannot be used for query optimizations.


I will ask more people for their opinion.

Regards,
Timo


On 25.03.24 10:45, Timo Walther wrote:

Hi Ron and Lincoln,

thanks for the quick response and the very insightful discussion.

 > we might limit future opportunities to optimize queries
 > through automatic materialization rewriting by allowing data
 > modifications, thus losing the potential for such optimizations.

This argument makes a lot of sense to me. Due to the updates, the system 
is not in full control of the persisted data. However, the system is 
still in full control of the job that powers the refresh. So if the 
system manages all updating pipelines, it could still leverage automatic 
materialization rewriting but without leveraging the data at rest (only 
the data in flight).


 > we are considering another candidate, Derived Table, the term 'derive'
 > suggests a query, and 'table' retains modifiability. This approach
 > would not disrupt our current concept of a dynamic table

I did some research on this term. The SQL standard uses the term 
"derived table" extensively (defined in section 4.17.3). Thus, a lot of 
vendors adopt this for simply referring to a table within a subclause:


https://dev.mysql.com/doc/refman/8.0/en/derived-tables.html

https://infocenter.sybase.com/help/topic/com.sybase.infocenter.dc32300.1600/doc/html/san1390612291252.html

https://www.c-sharpcorner.com/article/derived-tables-vs-common-table-expressions/

https://stackoverflow.com/questions/26529804/what-are-the-derived-tables-in-my-explain-statement

https://www.sqlservercentral.com/articles/sql-derived-tables

Esp. the latter example is interesting, SQL Server allows things like 
this on derived tables:


UPDATE T SET Name='Timo' FROM (SELECT * FROM Product) AS T

SELECT * FROM Product;

Btw also Snowflake's dynamic table state:

 > Because the content of a dynamic table is fully determined
 > by the given query, the content cannot be changed by using DML.
 > You don’t insert, update, or delete the rows in a dynamic table.

So a new term makes a lot of sense.

How about using `UPDATING`?

CREATE UPDATING TABLE

This reflects that modifications can be made and from an 
English-language perspective you can PAUSE or RESUME the UPDATING.

Thus, a user can define UPDATING interval and mode?

Looking forward to your thoughts.

Regards,
Timo


On 25.03.24 07:09, Ron liu wrote:

Hi, Ahmed

Thanks for your feedback.

Regarding your question:


I want to iterate on Timo's comments regarding the confusion between

"Dynamic Table" and current Flink "Table". Should the refactoring of the
system happen in 2.0, should we rename it in this Flip ( as the 
suggestions
in the thread ) and address the holistic changes in a separate Flip 
for 2.0?


Lincoln proposed a new concept in reply to Timo: Derived Table, which 
is a

combination of Dynamic Table + Continuous Query, and the use of Derived
Table will not conflict with existing concepts, what do you think?


I feel confused with how it is further with other components, the
examples provided feel like a standalone ETL job, could you provide in 
the

FLIP an example where the table is further used in subsequent queries
(specially in batch mode).

Thanks for your suggestion, I added how to use Dynamic Table in FLIP user
story section, Dynamic Table can be referenced by downstream Dynamic 
Table

and can also support OLAP queries.

Best,
Ron

Ron liu  于2024年3月23日周六 10:35写道:


Hi, Feng

Thanks for your feedback.


Although currently we restrict users from modifying the query, I wonder

if
we can provide a better way to help users rebuild it without affecting
downstream OLAP queries.

Considering the problem of data consistency, so in the first step we are
strictly limited in semantics and do not support modify the query. 
This is
really a good problem, one of my ideas is to introduce a syntax 
similar to

SWAP [1], which supports exchanging two Dynamic Tables.


 From the documentation, the definitions SQL and job information are

stored in the Catalog. Does this mean that if a system needs to adapt to
Dynamic Tables, it also needs to store Flink's job information in the
corresponding system?
For example, does MySQL's Catalog need to store flink job information as
well?

Yes, currently we need to rely on Catalog to store refresh job 
information.



Users still need to consider how much memory is being used, how large
the concurrency is, which type of state backend is being used, and 
may need

to set TTL expiration.

Simil

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-25 Thread Timo Walther
ibility of the user, so it's consistent with its behavior and no
additional guarantees are made at the engine level.

Best,
Ron


Ahmed Hamdy  于2024年3月22日周五 23:50写道:


Hi Ron,
Sorry for joining the discussion late, thanks for the effort.

I think the base idea is great, however I have a couple of comments:
- I want to iterate on Timo's comments regarding the confusion between
"Dynamic Table" and current Flink "Table". Should the refactoring of the
system happen in 2.0, should we rename it in this Flip ( as the
suggestions
in the thread ) and address the holistic changes in a separate Flip for
2.0?
- I feel confused with how it is further with other components, the
examples provided feel like a standalone ETL job, could you provide in the
FLIP an example where the table is further used in subsequent queries
(specially in batch mode).
- I really like the standard of keeping the unified batch and streaming
approach
Best Regards
Ahmed Hamdy


On Fri, 22 Mar 2024 at 12:07, Lincoln Lee  wrote:


Hi Timo,

Thanks for your thoughtful inputs!

Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same

function,

but our primary concern is that by using a view, we might limit future
opportunities
to optimize queries through automatic materialization rewriting [1],
leveraging
the support for MV by physical storage. This is because we would be
breaking
the intuitive semantics of a materialized view (a materialized view
represents
the result of a query) by allowing data modifications, thus losing the
potential
for such optimizations.

With these considerations in mind, we were inspired by Google Looker's
Persistent
Derived Table [2]. PDT is designed for building Looker's automated
modeling,
aligning with our purpose for the stream-batch automatic pipeline.
Therefore,
we are considering another candidate, Derived Table, the term 'derive'
suggests a
query, and 'table' retains modifiability. This approach would not

disrupt

our current
concept of a dynamic table, preserving the future utility of MVs.

Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
introducing
  a new concept Derived Table for this FLIP, this makes all concepts to

play

together nicely.

What do you think about this?

[1] https://calcite.apache.org/docs/materialized_views.html
[2]



https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables



Best,
Lincoln Lee


Timo Walther  于2024年3月22日周五 17:54写道:


Hi Ron,

thanks for the detailed answer. Sorry, for my late reply, we had a
conference that kept me busy.

  > In the current concept[1], it actually includes: Dynamic Tables &
  > & Continuous Query. Dynamic Table is just an abstract logical

concept


This explanation makes sense to me. But the docs also say "A

continuous

query is evaluated on the dynamic table yielding a new dynamic

table.".

So even our regular CREATE TABLEs are considered dynamic tables. This
can also be seen in the diagram "Dynamic Table -> Continuous Query ->
Dynamic Table". Currently, Flink queries can only be executed on

Dynamic

Tables.

  > In essence, a materialized view represents the result of a query.

Isn't that what your proposal does as well?

  > the object of the suspend operation is the refresh task of the
dynamic table

I understand that Snowflake uses the term [1] to merge their concepts

of

STREAM, TASK, and TABLE into one piece of concept. But Flink has no
concept of a "refresh task". Also, they already introduced

MATERIALIZED

VIEW. Flink is in the convenient position that the concept of
materialized views is not taken (reserved maybe for exactly this use
case?). And SQL standard concept could be "slightly adapted" to our
needs. Looking at other vendors like Postgres[2], they also use
`REFRESH` commands so why not adding additional commands such as

DELETE

or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the

database

to use an existing table segment"[3] which comes closer to what we

want

as well.

  > it is not intended to support data modification

This is an argument that I understand. But we as Flink could allow

data

modifications. This way we are only extending the standard and don't
introduce new concepts.

If we can't agree on using MATERIALIZED VIEW concept. We should fix

our

syntax in a Flink 2.0 effort. Making regular tables bounded and

dynamic

tables unbounded. We would be closer to the SQL standard with this and
pave the way for the future. I would actually support this if all
concepts play together nicely.

  > In the future, we can consider extending the statement set syntax

to

support the creation of multiple dynamic tables.

It's good that we called the concept STATEMENT SET. This allows us to
defined CREATE TABLE within. Even if it might look a bit confusing.

Regards,
Timo

[1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
[2]


https://www.postgresql.org/docs/curren

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-25 Thread Timo Walther
ink the ML model FLIP is not blocked by this, because we
can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs
just like TUMBLE/HOP. And support user-defined ML functions as
a future FLIP.

Regarding the simplified PTF syntax which reduces the outer TABLE()
keyword,
it seems it was just supported[1] by the Calcite community last month

and

will be
released in the next version (v1.37). The Calcite community is

preparing

the
1.37 release, so we can bump the version if needed in Flink 1.19.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/CALCITE-6254

On Fri, 22 Mar 2024 at 21:46, Timo Walther  wrote:


Hi everyone,

this is a very important change to the Flink SQL syntax but we can't
wait until the SQL standard is ready for this. So I'm +1 on

introducing

the MODEL concept as a first class citizen in Flink.

For your information: Over the past months I have already spent a
significant amount of time thinking about how we can introduce PTFs in
Flink. I reserved FLIP-440[1] for this purpose and I will share a
version of this in the next 1-2 weeks.

For a good implementation of FLIP-440 and also FLIP-437, we should
evolve the PTF syntax in collaboration with Apache Calcite.

There are different syntax versions out there:

1) Flink

SELECT * FROM
TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10'

MINUTES));


2) SQL standard

SELECT * FROM
TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10'

MINUTES));


3) Oracle

SELECT * FROM
 TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));

As you can see above, Flink does not follow the standard correctly as

it

would need to use `TABLE()` but this is not provided by Calcite yet.

I really like the Oracle syntax[2][3] a lot. It reduces necessary
keywords to a minimum. Personally, I would like to discuss this syntax
in a separate FLIP and hope I will find supporters for:


SELECT * FROM
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);

If we go entirely with the Oracle syntax, as you can see in the

example,

Oracle allows for passing identifiers directly. This would solve our
problems for the MODEL as well:

SELECT f1, f2, label FROM ML_PREDICT(
data => `my_data`,
model => `classifier_model`,
input => DESCRIPTOR(f1, f2));

Or we completely adopt the Oracle syntax:

SELECT f1, f2, label FROM ML_PREDICT(
data => `my_data`,
model => `classifier_model`,
input => COLUMNS(f1, f2));


What do you think?

Happy to create a FLIP for just this syntax question and collaborate
with the Calcite community on this. Supporting the syntax of Oracle
shouldn't be too hard to convince at least as parser parameter.

Regards,
Timo

[1]





https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions

[2]





https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072

[3]

https://oracle-base.com/articles/18c/polymorphic-table-functions-18c




On 20.03.24 17:22, Mingge Deng wrote:

Thanks Jark for all the insightful comments.

We have updated the proposal per our offline discussions:
1. Model will be treated as a new relation in FlinkSQL.
2. Include the common ML predict and evaluate functions into the

open

source flink to complete the user journey.
  And we should be able to extend the calcite SqlTableFunction to

support

these two ML functions.

Best,
Mingge

On Mon, Mar 18, 2024 at 7:05 PM Jark Wu  wrote:


Hi Hao,


I meant how the table name

in window TVF gets translated to `SqlCallingBinding`. Probably we

need

to

fetch the table definition from the catalog somewhere. Do we treat

those

window TVF specially in parser/planner so that catalog is looked up

when

they are seen?

The table names are resolved and validated by Calcite SqlValidator.

We

don' need to fetch from catalog manually.
The specific checking logic of cumulate window happens in
SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
The return type of SqlCumulateTableFunction is defined in
#getRowTypeInference() method.
Both are public interfaces provided by Calcite and it seems it's

not

specially handled in parser/planner.

I didn't try that, but my gut feeling is that the framework is

ready

to

extend a customized TVF.


For what model is, I'm wondering if it has to be datatype or

relation.

Can
it be another kind of citizen parallel to

datatype/relation/function/db?

Redshift also supports `show models` operation, so it seems it's

treated

specially as well?

If it is an entity only used in catalog scope (e.g., show xxx,

create

xxx,

drop xxx), it is fine to introduce it.
We have introduced such one before, called Module: "load module",

"show

modules" [1].
But if we want to use Model in TVF parameters, it means it has to

be

a

relation or datatype, because
that is what it only accepts now.

Thanks for sharing the reason of prefe

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-22 Thread Timo Walther

Hi everyone,

this is a very important change to the Flink SQL syntax but we can't 
wait until the SQL standard is ready for this. So I'm +1 on introducing 
the MODEL concept as a first class citizen in Flink.


For your information: Over the past months I have already spent a 
significant amount of time thinking about how we can introduce PTFs in 
Flink. I reserved FLIP-440[1] for this purpose and I will share a 
version of this in the next 1-2 weeks.


For a good implementation of FLIP-440 and also FLIP-437, we should 
evolve the PTF syntax in collaboration with Apache Calcite.


There are different syntax versions out there:

1) Flink

SELECT * FROM
  TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

2) SQL standard

SELECT * FROM
  TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

3) Oracle

SELECT * FROM
   TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));

As you can see above, Flink does not follow the standard correctly as it 
would need to use `TABLE()` but this is not provided by Calcite yet.


I really like the Oracle syntax[2][3] a lot. It reduces necessary 
keywords to a minimum. Personally, I would like to discuss this syntax 
in a separate FLIP and hope I will find supporters for:



SELECT * FROM
  TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);

If we go entirely with the Oracle syntax, as you can see in the example, 
Oracle allows for passing identifiers directly. This would solve our 
problems for the MODEL as well:


SELECT f1, f2, label FROM ML_PREDICT(
  data => `my_data`,
  model => `classifier_model`,
  input => DESCRIPTOR(f1, f2));

Or we completely adopt the Oracle syntax:

SELECT f1, f2, label FROM ML_PREDICT(
  data => `my_data`,
  model => `classifier_model`,
  input => COLUMNS(f1, f2));


What do you think?

Happy to create a FLIP for just this syntax question and collaborate 
with the Calcite community on this. Supporting the syntax of Oracle 
shouldn't be too hard to convince at least as parser parameter.


Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
[2] 
https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072

[3] https://oracle-base.com/articles/18c/polymorphic-table-functions-18c



On 20.03.24 17:22, Mingge Deng wrote:

Thanks Jark for all the insightful comments.

We have updated the proposal per our offline discussions:
1. Model will be treated as a new relation in FlinkSQL.
2. Include the common ML predict and evaluate functions into the open
source flink to complete the user journey.
 And we should be able to extend the calcite SqlTableFunction to support
these two ML functions.

Best,
Mingge

On Mon, Mar 18, 2024 at 7:05 PM Jark Wu  wrote:


Hi Hao,


I meant how the table name

in window TVF gets translated to `SqlCallingBinding`. Probably we need to
fetch the table definition from the catalog somewhere. Do we treat those
window TVF specially in parser/planner so that catalog is looked up when
they are seen?

The table names are resolved and validated by Calcite SqlValidator.  We
don' need to fetch from catalog manually.
The specific checking logic of cumulate window happens in
SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
The return type of SqlCumulateTableFunction is defined in
#getRowTypeInference() method.
Both are public interfaces provided by Calcite and it seems it's not
specially handled in parser/planner.

I didn't try that, but my gut feeling is that the framework is ready to
extend a customized TVF.


For what model is, I'm wondering if it has to be datatype or relation.

Can
it be another kind of citizen parallel to datatype/relation/function/db?
Redshift also supports `show models` operation, so it seems it's treated
specially as well?

If it is an entity only used in catalog scope (e.g., show xxx, create xxx,
drop xxx), it is fine to introduce it.
We have introduced such one before, called Module: "load module", "show
modules" [1].
But if we want to use Model in TVF parameters, it means it has to be a
relation or datatype, because
that is what it only accepts now.

Thanks for sharing the reason of preferring TVF instead of Redshift way. It
sounds reasonable to me.

Best,
Jark

  [1]:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/

On Fri, 15 Mar 2024 at 13:41, Hao Li  wrote:


Hi Jark,

Thanks for the pointer. Sorry for the confusion: I meant how the table

name

in window TVF gets translated to `SqlCallingBinding`. Probably we need to
fetch the table definition from the catalog somewhere. Do we treat those
window TVF specially in parser/planner so that catalog is looked up when
they are seen?

For what model is, I'm wondering if it has to be datatype or relation.

Can

it be another kind of citizen parallel to datatype/relation/function/db?
Redshift also supports `show models` 

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-22 Thread Timo Walther
to achieve
dynamic updates of the target table similar to a database’s
Materialized View.
We hope to upgrade the Dynamic Table to a real entity that users can
operate, which combines the logical concepts of Dynamic Tables +
Continuous Query. By integrating the definition of tables and queries,
it can achieve functions similar to Materialized Views, simplifying
users' data processing pipelines.
So, the object of the suspend operation is the refresh task of the
dynamic table. The command  `ALTER DYNAMIC TABLE table_name SUSPEND `
is actually a shorthand for `ALTER DYNAMIC TABLE table_name SUSPEND
REFRESH` (if written in full for clarity, we can also modify it).

  2. Initially, we also considered Materialized Views
, but ultimately decided against them. Materialized views are designed
to enhance query performance for workloads that consist of common,
repetitive query patterns. In essence, a materialized view represents
the result of a query.
However, it is not intended to support data modification. For
Lakehouse scenarios, where the ability to delete or update data is
crucial (such as compliance with GDPR, FLIP-2), materialized views
fall short.

3.
Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only
defines metadata in the catalog but also automatically initiates a
data refresh task based on the query specified during table creation.
It dynamically executes data updates. Users can focus on data
dependencies and data generation logic.

4.
The new dynamic table does not conflict with the existing
DynamicTableSource and DynamicTableSink interfaces. For the developer,
all that needs to be implemented is the new CatalogDynamicTable,
without changing the implementation of source and sink.

5. For now, the FLIP does not consider supporting Table API operations

on

Dynamic Table
. However, once the SQL syntax is finalized, we can discuss this in a
separate FLIP. Currently, I have a rough idea: the Table API should
also introduce
DynamicTable operation interfaces
  corresponding to the existing Table interfaces.
The TableEnvironment
  will provide relevant methods to support various dynamic table
operations. The goal for the new Dynamic Table is to offer users an
experience similar to using a database, which is why we prioritize
SQL-based approaches initially.


How do you envision re-adding the functionality of a statement set,

that

fans out to multiple tables? This is a very important use case for

data

pipelines.



Multi-tables is indeed a very important user scenario. In the future,
we can consider extending the statement set syntax to support the
creation of multiple dynamic tables.



Since the early days of Flink SQL, we were discussing `SELECT

STREAM

*

FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and

EMIT,

into other keywords DYNAMIC TABLE and FRESHNESS. But the core
functionality is still there. I'm wondering if we should widen the

scope

(maybe not part of this FLIP but a new FLIP) to follow the standard

more

closely. Making `SELECT * FROM t` bounded by default and use new

syntax

for the dynamic behavior. Flink 2.0 would be the perfect time for

this,

however, it would require careful discussions. What do you think?



The query part indeed requires a separate FLIP
for discussion, as it involves changes to the default behavior.


[1]





https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables



Best,

Ron


Jing Zhang  于2024年3月13日周三 15:19写道:


Hi, Lincoln & Ron,

Thanks for the proposal.

I agree with the question raised by Timo.

Besides, I have some other questions.
1. How to define query of dynamic table?
Use flink sql or introducing new syntax?
If use flink sql, how to handle the difference in SQL between

streaming

and

batch processing?
For example, a query including window aggregate based on processing

time?

or a query including global order by?

2. Whether modify the query of dynamic table is allowed?
Or we could only refresh a dynamic table based on initial query?

3. How to use dynamic table?
The dynamic table seems to be similar with materialized view.  Will

we

do

something like materialized view rewriting during the optimization?

Best,
Jing Zhang


Timo Walther  于2024年3月13日周三 01:24写道:


Hi Lincoln & Ron,

thanks for proposing this FLIP. I think a design similar to what

you

propose has been in the heads of many people, however, I'm

wondering

how

this will fit into the bigger picture.

I haven't deeply reviewed the FLIP yet, but would like to ask some
initial questions:

Flink has introduced the concept of Dynamic Tables many years ago.

How

does the term "Dynamic Table" fit into Flink's regular tables and

also

how does it relate to Table API?

I fear that adding the DYNAMIC TABLE keyword could cause confusion

for

users, because a term for regular CREATE TABLE (that can be "kind

of

dynamic" as well and is backed by a changelog) is then missing.

Also

given that we c

[jira] [Created] (FLINK-34902) INSERT INTO column mismatch leads to IndexOutOfBoundsException

2024-03-21 Thread Timo Walther (Jira)
Timo Walther created FLINK-34902:


 Summary: INSERT INTO column mismatch leads to 
IndexOutOfBoundsException
 Key: FLINK-34902
 URL: https://issues.apache.org/jira/browse/FLINK-34902
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther


SQL:
{code}
INSERT INTO t (a, b) SELECT 1;
{code}

 

Stack trace:
{code}

org.apache.flink.table.api.ValidationException: SQL validation failed. Index 1 
out of bounds for length 1
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
    at
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
    at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
    at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
    at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
    at java.base/java.util.Objects.checkIndex(Objects.java:374)
    at java.base/java.util.ArrayList.get(ArrayList.java:459)
    at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1(PreValidateReWriter.scala:355)
    at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1$adapted(PreValidateReWriter.scala:355)

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-12 Thread Timo Walther

Hi Lincoln & Ron,

thanks for proposing this FLIP. I think a design similar to what you 
propose has been in the heads of many people, however, I'm wondering how 
this will fit into the bigger picture.


I haven't deeply reviewed the FLIP yet, but would like to ask some 
initial questions:


Flink has introduced the concept of Dynamic Tables many years ago. How 
does the term "Dynamic Table" fit into Flink's regular tables and also 
how does it relate to Table API?


I fear that adding the DYNAMIC TABLE keyword could cause confusion for 
users, because a term for regular CREATE TABLE (that can be "kind of 
dynamic" as well and is backed by a changelog) is then missing. Also 
given that we call our connectors for those tables, DynamicTableSource 
and DynamicTableSink.


In general, I find it contradicting that a TABLE can be "paused" or 
"resumed". From an English language perspective, this does sound 
incorrect. In my opinion (without much research yet), a continuous 
updating trigger should rather be modelled as a CREATE MATERIALIZED VIEW 
(which users are familiar with?) or a new concept such as a CREATE TASK 
(that can be paused and resumed?).


How do you envision re-adding the functionality of a statement set, that 
fans out to multiple tables? This is a very important use case for data 
pipelines.


Since the early days of Flink SQL, we were discussing `SELECT STREAM * 
FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and EMIT, 
into other keywords DYNAMIC TABLE and FRESHNESS. But the core 
functionality is still there. I'm wondering if we should widen the scope 
(maybe not part of this FLIP but a new FLIP) to follow the standard more 
closely. Making `SELECT * FROM t` bounded by default and use new syntax 
for the dynamic behavior. Flink 2.0 would be the perfect time for this, 
however, it would require careful discussions. What do you think?


Regards,
Timo


On 11.03.24 08:23, Ron liu wrote:

Hi, Dev


Lincoln Lee and I would like to start a discussion about FLIP-435:
Introduce a  New Dynamic Table for Simplifying Data Pipelines.


This FLIP is designed to simplify the development of data processing
pipelines. With Dynamic Tables with uniform SQL statements and
freshness, users can define batch and streaming transformations to
data in the same way, accelerate ETL pipeline development, and manage
task scheduling automatically.


For more details, see FLIP-435 [1]. Looking forward to your feedback.


[1]


Best,

Lincoln & Ron





Re: Default scale and precision SQL data types

2024-03-12 Thread Timo Walther

Hi Sergei,

please check with the SQL standard before. Most of these values have 
been derived from the standard. I don't like the default of TIMESTAMP(6) 
for timestamps but this is what the standard dictates. Same for not 
allowing VARCHAR(0) or VARCHAR defaulting to VARCHAR(1).


Changes to the type system affect a lot of downstream projects and 
people. Be aware that the FLIP might not be accepted.


Regards,
Timo


On 07.03.24 10:05, lorenzo.affe...@ververica.com.INVALID wrote:

Hello Sergei!

The proposal makes a lot of sense, and Martijn is right as well.
Are you willing to drive the FLIP effort?
Do you need any assistance with that?
On Mar 4, 2024 at 01:48 +0100, Martijn Visser , wrote:

Hi,

I think it would first require a FLIP, given it touches on the core type
system of SQL.

Best regards,

Martijn

On Sat, Mar 2, 2024 at 5:34 PM Sergei Morozov  wrote:


Hi there,

org.apache.flink.table.api.DataTypes allows the creation of temporal data
types by specifying precision (e.g. TIME(3)) or omitting it (TIME()). The
ability to omit precision for temporal types was introduced in
apache/flink@36fef44
<
https://github.com/apache/flink/commit/36fef4457a7f1de47989c8a2485581bcf8633b32



.

Unfortunately, this isn't possible for other data types (e.g. CHAR,
DECIMAL).
Even though they define defaults for length, precision, and scale, their
values have to be passed to the method explicitly.

Would a PR be accepted which will introduce the methods for the remaining
types similar to the temporal ones?

Thanks.







[jira] [Created] (FLINK-34476) Window TVFs with named parameters don't support column expansion

2024-02-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-34476:


 Summary: Window TVFs with named parameters don't support column 
expansion
 Key: FLINK-34476
 URL: https://issues.apache.org/jira/browse/FLINK-34476
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


It seems named parameters still have issues with column expansion of virtual 
metadata column:

{code}

SELECT * FROM TABLE(  TUMBLE( DATA => TABLE gaming_player_activity_source, 
TIMECOL => DESCRIPTOR(meta_col), SIZE => INTERVAL '10' MINUTES));

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34469) Implement TableDistribution toString

2024-02-19 Thread Timo Walther (Jira)
Timo Walther created FLINK-34469:


 Summary: Implement TableDistribution toString
 Key: FLINK-34469
 URL: https://issues.apache.org/jira/browse/FLINK-34469
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


The newly added TableDistribution misses a toString implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34316) Reduce instantiation of ScanRuntimeProvider for in streaming mode

2024-01-30 Thread Timo Walther (Jira)
Timo Walther created FLINK-34316:


 Summary: Reduce instantiation of ScanRuntimeProvider for in 
streaming mode
 Key: FLINK-34316
 URL: https://issues.apache.org/jira/browse/FLINK-34316
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


This is pure performance optimization by avoiding an additional call to 
\{{ScanTableSource#getScanRuntimeProvider}} in 
\{{org.apache.flink.table.planner.connectors.DynamicSourceUtils#validateScanSource}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Accept Flink CDC into Apache Flink

2024-01-15 Thread Timo Walther

+1 (binding)

Cheers,
Timo


On 09.01.24 10:58, xiangyu feng wrote:

+1 (non-binding)

Regards,
Xiangyu Feng

Danny Cranmer  于2024年1月9日周二 17:50写道:


+1 (binding)

Thanks,
Danny

On Tue, Jan 9, 2024 at 9:31 AM Feng Jin  wrote:


+1 (non-binding)

Best,
Feng Jin

On Tue, Jan 9, 2024 at 5:29 PM Yuxin Tan  wrote:


+1 (non-binding)

Best,
Yuxin


Márton Balassi  于2024年1月9日周二 17:25写道:


+1 (binding)

On Tue, Jan 9, 2024 at 10:15 AM Leonard Xu 

wrote:



+1(binding)

Best,
Leonard


2024年1月9日 下午5:08,Yangze Guo  写道:

+1 (non-binding)

Best,
Yangze Guo

On Tue, Jan 9, 2024 at 5:06 PM Robert Metzger <

rmetz...@apache.org



wrote:


+1 (binding)


On Tue, Jan 9, 2024 at 9:54 AM Guowei Ma 

wrote:



+1 (binding)
Best,
Guowei


On Tue, Jan 9, 2024 at 4:49 PM Rui Fan <1996fan...@gmail.com>

wrote:



+1 (non-binding)

Best,
Rui

On Tue, Jan 9, 2024 at 4:41 PM Hang Ruan <

ruanhang1...@gmail.com>

wrote:



+1 (non-binding)

Best,
Hang

gongzhongqiang  于2024年1月9日周二

16:25写道:



+1 non-binding

Best,
Zhongqiang

Leonard Xu  于2024年1月9日周二 15:05写道:


Hello all,

This is the official vote whether to accept the Flink CDC

code

contribution

to Apache Flink.

The current Flink CDC code, documentation, and website can

be

found here:
code: https://github.com/ververica/flink-cdc-connectors <
https://github.com/ververica/flink-cdc-connectors>
docs: https://ververica.github.io/flink-cdc-connectors/ <
https://ververica.github.io/flink-cdc-connectors/>

This vote should capture whether the Apache Flink community

is

interested

in accepting, maintaining, and evolving Flink CDC.

Regarding my original proposal[1] in the dev mailing list,

I

firmly

believe

that this initiative aligns perfectly with Flink. For the

Flink

community,

it represents an opportunity to bolster Flink's competitive

edge

in

streaming
data integration, fostering the robust growth and

prosperity

of

the

Apache

Flink
ecosystem. For the Flink CDC project, becoming a

sub-project

of

Apache

Flink
means becoming an integral part of a neutral open-source

community,

capable of
attracting a more diverse pool of contributors.

All Flink CDC maintainers are dedicated to continuously

contributing

to

achieve
seamless integration with Flink. Additionally, PMC members

like

Jark,

Qingsheng,
and I are willing to infacilitate the expansion of

contributors

and

committers to
effectively maintain this new sub-project.

This is a "Adoption of a new Codebase" vote as per the

Flink

bylaws

[2].

Only PMC votes are binding. The vote will be open at least

7

days

(excluding weekends), meaning until Thursday January 18

12:00

UTC,

or

until we
achieve the 2/3rd majority. We will follow the instructions

in

the

Flink

Bylaws
in the case of insufficient active binding voters:


1. Wait until the minimum length of the voting passes.
2. Publicly reach out via personal email to the remaining

binding

voters

in the
voting mail thread for at least 2 attempts with at least 7

days

between

two attempts.

3. If the binding voter being contacted still failed to

respond

after

all the attempts,
the binding voter will be considered as inactive for the

purpose

of

this

particular voting.

Welcome voting !

Best,
Leonard
[1]



https://lists.apache.org/thread/o7klnbsotmmql999bnwmdgo56b6kxx9l

[2]


















https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026

























[jira] [Created] (FLINK-34095) Add a restore test for StreamExecAsyncCalc

2024-01-15 Thread Timo Walther (Jira)
Timo Walther created FLINK-34095:


 Summary: Add a restore test for StreamExecAsyncCalc
 Key: FLINK-34095
 URL: https://issues.apache.org/jira/browse/FLINK-34095
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Alan Sheinberg


StreamExecAsyncCalc needs at least one restore test to check whether the node 
can be restored from a CompiledPlan and whether restore from a savepoint works.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34094) Document new AsyncScalarFunction

2024-01-15 Thread Timo Walther (Jira)
Timo Walther created FLINK-34094:


 Summary: Document new AsyncScalarFunction
 Key: FLINK-34094
 URL: https://issues.apache.org/jira/browse/FLINK-34094
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Timo Walther
Assignee: Alan Sheinberg


Write documentation in the user-defined functions page. Maybe summarizing the 
behavior of async calls in general. We could also think about add a REST API 
example in flink-table-examples?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-387: Support named parameters for functions and call procedures

2024-01-05 Thread Timo Walther

Thanks for the last minute change.

+1 (binding)

Cheers,
Timo


On 05.01.24 08:59, Feng Jin wrote:

Hi Timo,

Thank you for the suggestion. Previously, I thought most parameters were
optional, so the default value was set to true.

Your concern is reasonable. We should declare it as false by default and
developers should explicitly state if a parameter is optional instead of
using our default value.

Regarding this part, I have already made modifications in the document.


Best,
Feng


On Fri, Jan 5, 2024 at 3:38 PM Timo Walther  wrote:


Thanks, for starting the VOTE thread and thanks for considering my
feedback. One last comment before I'm also happy to give my +1 to this:

Why is ArgumentHint's default isOptinal=true? Shouldn't it be false by
default? Many function implementers will forget to set this to false and
suddenly get NULLs passed to their functions. Marking an argument as
optional should be an explicit decision of an implementer.

Regards,
Timo


On 05.01.24 05:06, Lincoln Lee wrote:

+1 (binding)

Best,
Lincoln Lee


Benchao Li  于2024年1月5日周五 11:46写道:


+1 (binding)

Feng Jin  于2024年1月5日周五 10:49写道:


Hi everyone

Thanks for all the feedback about the FLIP-387: Support named

parameters

for functions and call procedures [1] [2] .

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Jan 10, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.



[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures

[2] https://lists.apache.org/thread/bto7mpjvcx7d7k86owb00dwrm65jx8cn


Best,
Feng Jin




--

Best,
Benchao Li












Re: [VOTE] FLIP-387: Support named parameters for functions and call procedures

2024-01-04 Thread Timo Walther
Thanks, for starting the VOTE thread and thanks for considering my 
feedback. One last comment before I'm also happy to give my +1 to this:


Why is ArgumentHint's default isOptinal=true? Shouldn't it be false by 
default? Many function implementers will forget to set this to false and 
suddenly get NULLs passed to their functions. Marking an argument as 
optional should be an explicit decision of an implementer.


Regards,
Timo


On 05.01.24 05:06, Lincoln Lee wrote:

+1 (binding)

Best,
Lincoln Lee


Benchao Li  于2024年1月5日周五 11:46写道:


+1 (binding)

Feng Jin  于2024年1月5日周五 10:49写道:


Hi everyone

Thanks for all the feedback about the FLIP-387: Support named parameters
for functions and call procedures [1] [2] .

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Jan 10, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.



[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures

[2] https://lists.apache.org/thread/bto7mpjvcx7d7k86owb00dwrm65jx8cn


Best,
Feng Jin




--

Best,
Benchao Li







Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI

2024-01-02 Thread Timo Walther

Hi Jane,

thanks for the heavy investigation and extensive summaries. I'm sorry 
that I ignored this discussion for too long but would like to help in 
shaping a sustainable long-term solution.


I fear that changing:
- RowType#copy()
- RowType's constructor
- FieldsDataType#nullable()
will not solve all transitive issues.

We should approach the problem from a different perspective. In my point 
of view:

- DataType and LogicalType are just type declarations.
- RelDataType is similarly just a type declaration. Please correct me if 
I'm wrong but RelDataType itself also allows `ROW NOT 
NULL`. It's the factory or optimizer that performs necessary changes.
- It's up to the framework (i.e. planner or Table API) to decide what to 
do with these declarations.


Let's take a Java class:

class MyPojo {
  int i;
}

MyPojo can be nullable, but i cannot. This is the reason why we decided 
to introduce the current behavior. Complex structs are usually generated 
from Table API or from the catalog (e.g. when mapping to schema registry 
or some other external system). It could lead to other downstream 
inconsistencies if we change the method above.


I can't provide a better solution right now, I need more research on 
this topic. But we should definitely avoid another breaking change 
similar to [1] where the data type system was touched and other projects 
were affected.


How about we work together on this topic and create a FLIP for this? We 
need more examples in a unified document. Currently, the proposal is 
split across multiple Flink and Calcite JIRA issues and a ML discussion.


Regards,
Timo


[1] https://issues.apache.org/jira/browse/FLINK-33523


On 26.12.23 04:47, Jane Chan wrote:

Thanks Shengkai and Xuyang.

@Shengkai

I have one question: Is the influence only limited to the RowType? Does the

Map or Array type have the same problems?



I think the issue is exclusive to RowType. You may want to review
CALCITE-2464[1] for more details.

[1] https://issues.apache.org/jira/browse/CALCITE-2464

@Xuyang

Is it possible to consider introducing a deprecated option to allow users

to fall back to the previous version (default fallback), and then
officially deprecate it in Flink 2.0?



If I understand correctly, 2.0 allows breaking changes to remove historical
baggage in this release. Therefore, if we want to fix this issue before
2.0, we could introduce a fallback option in the two most recent versions
(1.19 and 1.20). However, from version 2.0 onwards, since we no longer
promise backward compatibility, introducing a fallback option might be
unnecessary. What do you think?

BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle

the nullable attribute of the Row type in the way Calcite expected.
However, fixing them will also cause a relatively large impact. We may also
need to check the code part in SQL.



Yes, this is another issue caused by the row type nullability handling.
I've mentioned this JIRA ticket in the reference link to the previous
reply.

Best,
Jane

On Mon, Dec 25, 2023 at 1:42 PM Xuyang  wrote:


Hi, Jane, thanks for driving this.


IMO, it is important to keep same consistent semantics between table api
and sql, not only for maintenance, but also for user experience. But for
users, the impact of this modification is a bit large. Is it possible to
consider introducing a deprecated option to allow users to fall back to the
previous version (default fallback), and then officially deprecate it in
Flink 2.0?


BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle
the nullable attribute of the Row type in the way Calcite expected.
However, fixing them will also cause a relatively large impact. We may also
need to check the code part in SQL.


[1] https://issues.apache.org/jira/browse/FLINK-33217




--

 Best!
 Xuyang





在 2023-12-25 10:16:28,"Shengkai Fang"  写道:

Thanks for Jane and Sergey's proposal!

+1 to correct the Table API behavior.

I have one question: Is the influence only limited to the RowType? Does

the

Map or Array type have the same problems?

Best,
Shengkai
[DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL
and TableA

Jane Chan  于2023年12月22日周五 17:40写道:


Dear devs,

Several issues [1][2][3] have been identified regarding the inconsistent
treatment of ROW type nullability between SQL and TableAPI. However,
addressing these discrepancies might necessitate updates to the public

API.

Therefore, I'm initiating this discussion to engage the community in
forging a unified approach to resolve these challenges.

To summarize, SQL prohibits ROW types such as ROW, which is implicitly rewritten to ROW by
Calcite[4]. In contrast, TableAPI permits such types, resulting in
inconsistency.
[image: image.png]
For a comprehensive issue breakdown, please refer to the comment of [1].

According to CALCITE-2464[4], ROW is not a valid type.

As

a result, the behavior of TableAPI is incorrect and needs to be

consistent


Re: [VOTE] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-28 Thread Timo Walther
+1 (binding)

Cheers,
Timo

> Am 28.12.2023 um 03:13 schrieb Yuepeng Pan :
> 
> +1 (non-binding).
> 
> Best,
> Yuepeng Pan.
> 
> 
> 
> 
> At 2023-12-28 09:19:37, "Lincoln Lee"  wrote:
>> +1 (binding)
>> 
>> Best,
>> Lincoln Lee
>> 
>> 
>> Martijn Visser  于2023年12月27日周三 23:16写道:
>> 
>>> +1 (binding)
>>> 
>>> On Fri, Dec 22, 2023 at 1:44 AM Jim Hughes 
>>> wrote:
 
 Hi Alan,
 
 +1 (non binding)
 
 Cheers,
 
 Jim
 
 On Wed, Dec 20, 2023 at 2:41 PM Alan Sheinberg
  wrote:
 
> Hi everyone,
> 
> I'd like to start a vote on FLIP-400 [1]. It covers introducing a new
>>> UDF
> type, AsyncScalarFunction for completing invocations asynchronously.
>>> It
> has been discussed in this thread [2].
> 
> I would like to start a vote.  The vote will be open for at least 72
>>> hours
> (until December 28th 18:00 GMT) unless there is an objection or
> insufficient votes.
> 
> [1]
> 
> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> [2] https://lists.apache.org/thread/q3st6t1w05grd7bthzfjtr4r54fv4tm2
> 
> Thanks,
> Alan
> 
>>> 



Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2023-12-19 Thread Timo Walther

Hi Xuyang,

sorry I missed the ping. Sounds reasonable to me. One FLIP about 
changelog semantics, the other about SQL semantics.


Regards,
Timo

On 19.12.23 02:39, Xuyang wrote:

Hi, Timo. Sorry for this noise.
What do you think about splitting the flip like this?




--

 Best!
 Xuyang





At 2023-12-15 10:05:32, "Xuyang"  wrote:

Hi, Timo, thanks for your advice.


I am considering splitting the existing flip into two while leaving the 
existing flip (or without).
One of them points to the completion of the operator about window tvf to 
support CDC (there are several
small work items, such as window agg, window rank, window join, etc. Due to 
time constraints,
the 1.19 version takes priority to complete the window agg). The other points 
to the HOP window tvf
supports a size that is a non-integer multiple of the step. Once these two 
flips are basically completed
in 1.19, we can consider officially deprecating the old group window agg syntax 
in the release note.


WDYT?




--

Best!
Xuyang





At 2023-12-14 17:51:01, "Timo Walther"  wrote:

Hi Xuyang,


I'm not spliting this flip is that all of these subtasks like session

window tvf and cdc support do not change the public interface and the
public syntax

Given the length of this mailing list discussion and number of involved
people I would strongly suggest to simplify the FLIP and give it a
better title to make quicker progress. In general, we all seem to be on
the same page in what we want. And both session TVF support and the
deprecation of the legacy group windows has been voted already and
discussed thouroughly. The FLIP can purely focus on the CDC topic.

Cheers,
Timo


On 14.12.23 08:35, Xuyang wrote:

Hi, Timo, Sergey and Lincoln Lee. Thanks for your feedback.



In my opinion the FLIP touches too many
topics at the same time and should be split into multiple FLIPs. We > should 
stay focused on what is needed for Flink 2.0.

The main goal and topic of this Flip is to align the abilities between the 
legacy group window agg syntax and the new window tvf syntax,
and then we can say that the legacy window syntax will be deprecated. IMO, 
although there are many misalignments about these two
syntaxes, such as session window tvf, cdc support and so on,  they are all the 
subtasks we need to do in this flip. Another reason I'm not
spliting this flip is that all of these subtasks like session window tvf and 
cdc support do not change the public interface and the public
syntax, the implements of them will only be in modules table-planner and 
table-runtime.



Can we postpone this discussion? Currently we should focus on user
switching to Window TVFs before Flink 2.0. Early fire, late fire and > allow 
lateness have not exposed through public configuration. It can be > introduced 
after Flink 2.0 released.



Agree with you. This flip will not and should not expose these experimental 
flink conf to users. I list them in this flip just aims to show the
misalignments about these two window syntaxes.


Look for your thought.




--

  Best!
  Xuyang





At 2023-12-13 15:40:16, "Lincoln Lee"  wrote:

Thanks Xuyang driving this work! It's great that everyone agrees with the
work itself in this flip[1]!

Regarding whether to split the flip or adjust the scope of this flip, I'd
like to share some thoughts:

1. About the title of this flip, what I want to say is that flip-145[2] had
marked the legacy group window deprecated in the documentation but the
functionality of the new syntax is not aligned with the legacy one.
This is not a user-friendly deprecation, so the initiation of this flip, as
I understand it, is for the formal deprecation of the legacy window, which
requires us to complete the functionality alignment.

2. Agree with Timo that we can process the late-fire/early-fire features
separately. These experimental parameters have not been officially opened
to users.
Considering the workload, we can focus more on this version.

3. I have no objection to splitting this flip if everyone feels that the
work included is too much.
Regarding the support of session tvf, it seems that the main problem is
that this part of the description occupies a large part of the flip,
causing some misunderstandings.
This is indeed a predetermined task in FLIP-145, just adding more
explanation about semantics. In addition, I saw the discussion history in
FLINK-24024[3], thanks Sergey for being willing to help driving this work
together.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-392%3A+Deprecate+the+Legacy+Group+Window+Aggregation
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function
[3] https://issues.apache.org/jira/browse/FLINK-24024

Best,
Lincoln Lee


Sergey Nuyanzin  于2023年12月13日周三 08:02写道:


thanks for summarising Timo

+1 for splitting it in different FLIPs
and agree about having "SESSION Window TVF Aggregation&

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-19 Thread Timo Walther

> I would be totally fine with the first version only having ORDERED
> mode. For a v2, we could attempt to do the next most conservative
> thing

Sounds good to me.

I also cheked AsyncWaitOperator and could not find n access of 
StreamRecord's timestamp but only watermarks. But as we said, let's 
focus on ORDERED first.


Can you remove the necessary parts? Esp.:

@Override
public Set getRequirements() {
return Collections.singleton(FunctionRequirement.ORDERED);
}

Otherwise I have no objections to start a VOTE soonish. If others are 
fine as well?


Regards,
Timo


On 19.12.23 07:32, Alan Sheinberg wrote:

Thanks for the helpful comments, Xuyang and Timo.

@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as

source and mysql as sink as an example.
Although kafka is an append-only source, one of its fields is used as pk
when writing to mysql. If async udx is executed
  in an unordered mode, there may be problems with the data in mysql in the
end. In this case, we need to ensure that
the sink-based pk is in order actually.



@Xuyang: That's a great point.  If some node downstream of my operator
cares about ordering, there's no way for it to reconstruct the original
ordering of the rows as they were input to my operator.  So even if they
want to preserve ordering by key, the order in which they see it may
already be incorrect.  Somehow I thought that maybe the analysis of the
changelog mode at a given operator was aware of downstream operations, but
it seems not.

Clear "no" on this. Changelog semantics make the planner complex and we

need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.



@Timo: The better I understand the complexity, the more I agree with this.
I would be totally fine with the first version only having ORDERED mode.
For a v2, we could attempt to do the next most conservative thing and only
allow UNORDERED when the whole graph is in *INSERT *changelog mode.  The
next best type of optimization might understand what's the key required
downstream, and allow breaking the original order only between unrelated
keys, but maintaining it between rows of the same key.  Of course if the
key used downstream is computed in some manner, that makes it all the
harder to know this beforehand.

So unordering should be fine *within* watermarks. This is also what

watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if I
remember correctly. However, it assumes a timestamp is present in
StreamRecord on which it can work. But this is not the case within the
SQL engine.



*AsyncWaitOperator* and *UnorderedStreamElementQueue* (the implementations
I plan on using) seem to support exactly this behavior.  I don't think it
makes assumptions about the record's timestamp, but just preserves whatever
the input order is w.r.t watermarks.  I'd be curious to understand the
timestamp use in more detail and see if it's required with the mentioned
classes.

TLDR: Let's focus on ORDERED first.


I'm more than happy to start here and we can consider UNORDERED as a
followup.  Then maybe we consider only INSERT mode graphs and ones where we
can solve the watermark constraints.

Thanks,
Alan


On Mon, Dec 18, 2023 at 2:36 AM Timo Walther  wrote:


Hi Xuyang and Alan,

thanks for this productive discussion.

  > Would it make a difference if it were exposed by the explain

@Alan: I think this is great idea. +1 on exposing the sync/async
behavior thought EXPLAIN.


  > Is there an easy way to determine if the output of an async function
  > would be problematic or not?

Clear "no" on this. Changelog semantics make the planner complex and we
need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.

  > If the input to the operator is append-only, it seems fine, because
  > this implies that each row is effectively independent and ordering is
  > unimportant.

As @Xuyang pointed out, it's not only the input that decides whether
append-only is safe. It's also the subsequent operators in the pipeline.
The example of Xuyang is a good one, when the sink operates in upsert
mode. Append-only source, append-only operators, and append-only sink
are safer.

However, even in this combination, a row is not fully "independent"
there are still watermarks flowing between rows:

R(5), W(4), R(3), R(4), R(2), R(1), W(0)

So unordering should be fine *within* watermarks. This is also what
watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if I
remember correctly. However, it assumes a 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-18 Thread Timo Walther
throwing an exception to make it clear to users that using async

scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users
may be confused about
the final actual job graph.



@Xuyang: Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?  I'd be fine to do it
either way -- certainly throwing an error is a bit simpler.

You are right. Actually it should be the planner that fully decides

whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output.



@Timo: Is there an easy way to determine if the output of an async function
would be problematic or not?  If the input to the operator is append-only,
it seems fine, because this implies that each row is effectively
independent and ordering is unimportant. For upsert mode with +U rows, you
wouldn't want to swap order with other +U rows for the same key because the
last one should win.  For -D or -U rows, you wouldn't want to swap with
other rows for the same key for similar reasons.  Is it as simple as
looking for the changlelog mode to determine whether it's safe to run async
functions UNORDERED?  I had considered analyzing various query forms (join
vs aggregation vs whatever), but it seems like changelog mode could be
sufficient to understand what works and what would be an issue.  Any code
pointers and explanation for similar analysis would be great to understand
this more.

The mode UNORDERED however should only have

effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.


@Timo: Should we throw errors or run in sync mode?  It seems like running
in sync mode is an option to ensure correctness in all changelog modes.

Let's go with global configuration first and later introduce

hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.


@Timo: That seems like a reasonable approach to me.

-Alan

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/

On Fri, Dec 15, 2023 at 2:56 AM Timo Walther  wrote:


1. Override the function `getRequirements` in `AsyncScalarFunction`

  > If the user overrides `requirements()` to omit the `ORDERED`
  > requirement, do we allow the operator to return out-of-order results
  > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
  > behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output. The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.

2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.

  > What about throwing an exception to make it clear to users that using
async scalar functions

@Xuyang: A regular SQL user doesn't care whether the function is sync or
async. The planner should simply give its best to make the execution
performant. I would not throw an exception here. There more exceptions
the, the more struggles and questions from the user. Conceptually, we
can run async code also sync, and that's why we should also do it to
avoid errors.

3. Hints

@Aitozi: Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.

Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:

Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the

improvement

effect is obvious, so I think it makes sense to support async scalar

function.  Big +1 for this flip.

I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query

hint(individual async udx) to mark the output

mode 'order' or 'unorder' like async look join [1] and async udtf[2],

but chose to introduce a new enum

in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be

executed in sync mode.



What about throwing an exception to make it clear to users that using

async scalar functions in this situation

is problematic instead of executing silently in sync mode? Because users

may be confused about

the final actual job graph.


[1]

ht

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-15 Thread Timo Walther

1. Override the function `getRequirements` in `AsyncScalarFunction`

> If the user overrides `requirements()` to omit the `ORDERED`
> requirement, do we allow the operator to return out-of-order results
> or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides 
whether ORDERED or UNORDERED is safe to do. For example, if the query is 
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the 
operator is not allowed to produce unordered results. By global 
configuration, we can set ORDERED such that users don't get confused 
about the unordered output. The mode UNORDERED however should only have 
effect for these simply use cases and throw an exception if UNORDERED 
would mess up a changelog or other subsequent operators.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


> What about throwing an exception to make it clear to users that using 
async scalar functions


@Xuyang: A regular SQL user doesn't care whether the function is sync or 
async. The planner should simply give its best to make the execution 
performant. I would not throw an exception here. There more exceptions 
the, the more struggles and questions from the user. Conceptually, we 
can run async code also sync, and that's why we should also do it to 
avoid errors.


3. Hints

@Aitozi: Let's go with global configuration first and later introduce 
hints. I feel the more hints we introduce, the harder SQL queries get 
when maintaining them.


Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:

Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the 
improvement
effect is obvious, so I think it makes sense to support async scalar function.  
Big +1 for this flip.
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query hint(individual async 
udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but chose 
to introduce a new enum
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


What about throwing an exception to make it clear to users that using async 
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users may be 
confused about
the final actual job graph.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

 Best!
 Xuyang





在 2023-12-15 11:20:24,"Aitozi"  写道:

Hi Alan,
Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther  于2023年12月14日周四 17:29写道:


Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.

Great that we can generalize and reuse parts of the Python planner rules
and code for this.

I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.

I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements. I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
Set` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the user
intents to do this.


Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:

+1 to t

Re: [DISCUSS] FLIP-387: Support named parameters for functions and call procedures

2023-12-14 Thread Timo Walther

Hi Feng,

thank you for proposing this FLIP. This nicely completes FLIP-65 which 
is great for usability.


I read the FLIP and have some feedback:


1) ArgumentNames annotation

> Deprecate the ArgumentNames annotation as it is not user-friendly for 
specifying argument names with optional configuration.


Which annotation does the FLIP reference here? I cannot find it in the 
Flink code base.


2) Evolution of FunctionHint

Introducing @ArgumentHint makes a lot of sense to me. However, using it 
within @FunctionHint looks complex, because there is both `input=` and 
`arguments=`. Ideally, the @DataTypeHint can be defined inline as part 
of the @ArgumentHint. It could even be the `value` such that 
`@ArgumentHint(@DataTypeHint("INT"))` is valid on its own.


We could deprecate `input=`. Or let both `input` and `arguments=` 
coexist but never be defined at the same time.


3) Semantical correctness

As you can see in the `TypeInference` class, named parameters are 
prepared in the stack already. However, we need to watch out between 
helpful explanation (see `InputTypeStrategy#getExpectedSignatures`) and 
named parameters (see `TypeInference.Builder#namedArguments`) that can 
be used in SQL.


If I remember correctly, named parameters can be reordered and don't 
allow overloading of signatures. Thus, only a single eval() should have 
named parameters. Looking at the FLIP it seems you would like to support 
multiple parameter lists. What changes are you planning to TypeInference 
(which is also declared as @PublicEvoving)? This should also be 
documented as the annotations should compile into this class.


In general, I would prefer to keep it simple and don't allow overloading 
named parameters. With the optionality, users can add an arbitrary 
number of parameters to the signature of the same eval method.


Regards,
Timo

On 14.12.23 10:02, Feng Jin wrote:

Hi all,


Xuyang and I would like to start a discussion of FLIP-387: Support
named parameters for functions and call procedures [1]

Currently, when users call a function or call a procedure, they must
specify all fields in order. When there are a large number of
parameters, it is easy to make mistakes and cannot omit specifying
non-mandatory fields.

By using named parameters, you can selectively specify the required
parameters, reducing the probability of errors and making it more
convenient to use.

Here is an example of using Named Procedure.
```
-- for scalar function
SELECT my_scalar_function(param1 => ‘value1’, param2 => ‘value2’’) FROM []

-- for table function
SELECT  *  FROM TABLE(my_table_function(param1 => 'value1', param2 => 'value2'))

-- for agg function
SELECT my_agg_function(param1 => 'value1', param2 => 'value2') FROM []

-- for call procedure
CALL  procedure_name(param1 => ‘value1’, param2 => ‘value2’)
```

For UDX and Call procedure developers, we introduce a new annotation
to specify the parameter name, indicate if it is optional, and
potentially support specifying default values in the future

```
public @interface ArgumentHint {
 /**
  * The name of the parameter, default is an empty string.
  */
 String name() default "";

 /**
  * Whether the parameter is optional, default is true.
  */
 boolean isOptional() default true;
}}
```

```
// Call Procedure Development

public static class NamedArgumentsProcedure implements Procedure {

// Example usage: CALL myNamedProcedure(in1 => 'value1', in2 => 'value2')

// Example usage: CALL myNamedProcedure(in1 => 'value1', in2 =>
'value2', in3 => 'value3')

@ProcedureHint(
input = {@DataTypeHint(value = "STRING"),
@DataTypeHint(value = "STRING"), @DataTypeHint(value = "STRING")},
output = @DataTypeHint("STRING"),
   arguments = {
 @ArgumentHint(name = "in1", isOptional = false),
 @ArgumentHint(name = "in2", isOptional = true)
 @ArgumentHint(name = "in3", isOptional = true)})
public String[] call(ProcedureContext procedureContext, String
arg1, String arg2, String arg3) {
return new String[]{arg1 + ", " + arg2 + "," + arg3};
}
}
```


Currently, we offer support for two scenarios when calling a function
or procedure:

1. The corresponding parameters can be specified using the parameter
name, without a specific order.
2. Unnecessary parameters can be omitted.


There are still some limitations when using Named parameters:
1. Named parameters do not support variable arguments.
2. UDX or procedure classes that support named parameters can only
have one eval method.
3. Due to the current limitations of Calcite-947[2], we cannot specify
a default value for omitted parameters, which is Null by default.



Also, thanks very much for the suggestions and help provided by Zelin
and Lincoln.




1. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures.

2. 

Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2023-12-14 Thread Timo Walther

Hi Xuyang,

> I'm not spliting this flip is that all of these subtasks like session 
window tvf and cdc support do not change the public interface and the 
public syntax


Given the length of this mailing list discussion and number of involved 
people I would strongly suggest to simplify the FLIP and give it a 
better title to make quicker progress. In general, we all seem to be on 
the same page in what we want. And both session TVF support and the 
deprecation of the legacy group windows has been voted already and 
discussed thouroughly. The FLIP can purely focus on the CDC topic.


Cheers,
Timo


On 14.12.23 08:35, Xuyang wrote:

Hi, Timo, Sergey and Lincoln Lee. Thanks for your feedback.



In my opinion the FLIP touches too many
topics at the same time and should be split into multiple FLIPs. We > should 
stay focused on what is needed for Flink 2.0.

The main goal and topic of this Flip is to align the abilities between the 
legacy group window agg syntax and the new window tvf syntax,
and then we can say that the legacy window syntax will be deprecated. IMO, 
although there are many misalignments about these two
syntaxes, such as session window tvf, cdc support and so on,  they are all the 
subtasks we need to do in this flip. Another reason I'm not
spliting this flip is that all of these subtasks like session window tvf and 
cdc support do not change the public interface and the public
syntax, the implements of them will only be in modules table-planner and 
table-runtime.



Can we postpone this discussion? Currently we should focus on user
switching to Window TVFs before Flink 2.0. Early fire, late fire and > allow 
lateness have not exposed through public configuration. It can be > introduced 
after Flink 2.0 released.



Agree with you. This flip will not and should not expose these experimental 
flink conf to users. I list them in this flip just aims to show the
misalignments about these two window syntaxes.


Look for your thought.




--

 Best!
 Xuyang





At 2023-12-13 15:40:16, "Lincoln Lee"  wrote:

Thanks Xuyang driving this work! It's great that everyone agrees with the
work itself in this flip[1]!

Regarding whether to split the flip or adjust the scope of this flip, I'd
like to share some thoughts:

1. About the title of this flip, what I want to say is that flip-145[2] had
marked the legacy group window deprecated in the documentation but the
functionality of the new syntax is not aligned with the legacy one.
This is not a user-friendly deprecation, so the initiation of this flip, as
I understand it, is for the formal deprecation of the legacy window, which
requires us to complete the functionality alignment.

2. Agree with Timo that we can process the late-fire/early-fire features
separately. These experimental parameters have not been officially opened
to users.
Considering the workload, we can focus more on this version.

3. I have no objection to splitting this flip if everyone feels that the
work included is too much.
Regarding the support of session tvf, it seems that the main problem is
that this part of the description occupies a large part of the flip,
causing some misunderstandings.
This is indeed a predetermined task in FLIP-145, just adding more
explanation about semantics. In addition, I saw the discussion history in
FLINK-24024[3], thanks Sergey for being willing to help driving this work
together.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-392%3A+Deprecate+the+Legacy+Group+Window+Aggregation
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function
[3] https://issues.apache.org/jira/browse/FLINK-24024

Best,
Lincoln Lee


Sergey Nuyanzin  于2023年12月13日周三 08:02写道:


thanks for summarising Timo

+1 for splitting it in different FLIPs
and agree about having "SESSION Window TVF Aggregation" under FLIP-145
Moreover the task is already there, so no need to move it from one FLIP to
another


And actually Sergey Nuyanzin wanted to work
in this if I remember correctly. Not sure if this is still the case.


Yes, however it seems there is already an existing PR for that.
Anyway I'm happy to help here with the review as well and will reserve some
time for it in coming days.



On Tue, Dec 12, 2023 at 12:18 PM Timo Walther  wrote:


Hi Xuyang,

thanks for proposing this FLIP. In my opinion the FLIP touches too many
topics at the same time and should be split into multiple FLIPs. We
should stay focused on what is needed for Flink 2.0.

Let me summarizing the topics:

1) SESSION Window TVF Aggregation

This has been agreed in FLIP-145 already. We don't need another FLIP but
someone that finally implements this after we have performed the Calcite
upgrade a couple of months ago. The Calcite upgrade was important
exactly for SESSION windows. And actually Sergey Nuyanzin wanted to work
in this if I remember correctly. Not sure if this is still the case.

2) CDC support of

Re: [DISCUSS] Should Configuration support getting value based on String key?

2023-12-14 Thread Timo Walther
The configuration in Flink is complicated and I fear we won't have 
enough capacity to substantially fix it. The introduction of 
ReadableConfig, WritableConfig, and typed ConfigOptions was a right step 
into making the code more maintainable. From the Flink side, every read 
access should go through ConfigOption.


However, I also understand Gyula pragmatism here because (practically 
speaking) users get access `getString()` via `toMap().get()`. So I'm 
fine with removing the deprecation for functionality that is available 
anyways. We should, however, add the message to `getString()` that this 
method is discouraged and `get(ConfigOption)` should be the preferred 
way of accessting Configuration.


In any case we should remove the getInt and related methods.

Cheers,
Timo


On 14.12.23 09:56, Gyula Fóra wrote:

I see a strong value for user facing configs to use ConfigOption and this
should definitely be an enforced convention.

However with the Flink project growing and many other components and even
users using the Configuration object I really don’t think that we should
“force” this on the users/developers.

If we make fromMap / toMap free with basically no overhead, that is fine
but otherwise I think it would hurt the user experience to remove the
simple getters / setters. Temporary configoptions to access strings from
what is practically string->string map is exactly the kind of unnecessary
boilerplate that every dev and user wants to avoid.

Gyula

There are many cases where the features of the configoption are really not
needed.

On Thu, 14 Dec 2023 at 09:38, Xintong Song  wrote:


Hi Gyula,

First of all, even if we remove the `getXXX(String key, XXX defaultValue)`
methods, there are still several ways to access the configuration with
string-keys.

- If one wants to access a specific option, as Rui mentioned,
`ConfigOptions.key("xxx").stringType().noDefaultValue()` can be used.
TBH,
I can't think of a use case where a temporally created ConfigOption is
preferred over a predefined one. Do you have any examples for that?
- If one wants to access the whole configuration set, then `toMap` or
`iterator` might be helpful.

It is true that these ways are less convenient than `getXXX(String key, XXX
defaultValue)`, and that's exactly my purpose, to make the key-string less
convenient so that developers choose ConfigOption over it whenever is
possible.

there will always be cases where a more flexible

dynamic handling is necessary without the added overhead of the toMap

logic




I'm not sure about this. I agree there are cases where flexible and dynamic
handling is needed, but maybe "without the added overhead of the toMap
logic" is not that necessary?

I'd think of this as "encouraging developers to use ConfigOption as much as
possible" vs. "a bit less convenient in 5% of the cases". I guess there's
no right and wrong, just different engineer opinions. While I'm personally
stand with removing the string-key access methods, I'd also be fine with
the other way if there are more people in favor of it.

Best,

Xintong



On Thu, Dec 14, 2023 at 3:45 PM Gyula Fóra  wrote:


Hi Xintong,

I don’t really see the actual practical benefit from removing the

getstring

and setstring low level methods.

I understand that ConfigOptions are nicer for 95% of the cases but from a
technical point of view there will always be cases where a more flexible
dynamic handling is necessary without the added overhead of the toMap
logic.

I think it’s the most natural thing for any config abstraction to expose
basic get set methods with a simple key.

What do you think?

Cheers
Gyula

On Thu, 14 Dec 2023 at 08:00, Xintong Song 

wrote:




IIUC, you both prefer using ConfigOption instead of string keys for
all use cases, even internal ones. We can even forcefully delete
these @Depreated methods in Flink-2.0 to guide users or
developers to use ConfigOption.



Yes, at least from my side.


I noticed that Configuration is used in

DistributedCache#writeFileInfoToConfig and readFileInfoFromConfig
to store some cacheFile meta-information. Their keys are
temporary(key name with number) and it is not convenient
to predefine ConfigOption.



True, this one requires a bit more effort to migrate from string-key to
ConfigOption, but still should be doable. Looking at how the two

mentioned

methods are implemented and used, it seems what is really needed is
serialization and deserialization of `DistributedCacheEntry`-s. And all

the

entries are always written / read at once. So I think we can serialize

the

whole set of entries into a JSON string (or something similar), and use

one

ConfigOption with a deterministic key for it, rather than having one
ConfigOption for each field in each entry. WDYT?


If everyone agrees with this direction, we can start to refactor all

code that uses getXxx(String key, String defaultValue) into
getXxx(ConfigOption configOption), and completely
delete all getXxx(String key, 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Timo Walther

Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has 
been requested multiple times. It will be in particular interesting for 
accessing REST endpoints and other remote services.


Great that we can generalize and reuse parts of the Python planner rules 
and code for this.


I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space 
should remain constant. Only a constant key space enables the use of the 
ConfigOption class which is required in the layered configuration. For 
now I would suggest to only allow a global setting for buffer capacity, 
timeout, and retry-strategy. We can later work on a per-function 
configuration (potentially also needed for other use cases).


2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined 
per-function. It impacts the query result and potentially the behavior 
of planner rules.


I see two options for this either: (a) an additional method in 
AsyncScalarFunction or (b) adding this to the function's requirements. I 
vote for (b), because a FunctionDefinition should be fully self 
contained and sufficient for planning.


Thus, for `FunctionDefinition.getRequirements(): 
Set` we can add a new requirement `ORDERED` which 
should also be the default for AsyncScalarFunction. `getRequirements()` 
can be overwritten and return a set without this requirement if the user 
intents to do this.



Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:

+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg 
napisał(a):



Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can

find

another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)


Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
believe that the python calls are also done asynchronously, so that might
be a reasonable name, so long as there's no confusion between the base and
async child class.

On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes 
wrote:


Hi Alan,

Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can

find

another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)

Cheers,

Jim

On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
 wrote:


I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]

This feature proposes adding a new UDF type AsyncScalarFunction which

is

invoked just like a normal ScalarFunction, but is implemented with an
asynchronous eval method.  I had brought this up including the

motivation

in a previous discussion thread [2].

The purpose is to achieve high throughput scalar function UDFs while
allowing that an individual call may have high latency.  It allows

scaling

up the parallelism of just these calls without having to increase the
parallelism of the whole query (which could be rather resource
inefficient).

In practice, it should enable SQL integration with external services

and

systems, which Flink has limited support for at the moment. It should

also

allow easier integration with existing libraries which use asynchronous
APIs.

Looking forward to your feedback and suggestions.

[1]





https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support

<




https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support




[2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs


Thanks,
Alan











Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2023-12-12 Thread Timo Walther

Hi Xuyang,

thanks for proposing this FLIP. In my opinion the FLIP touches too many 
topics at the same time and should be split into multiple FLIPs. We 
should stay focused on what is needed for Flink 2.0.


Let me summarizing the topics:

1) SESSION Window TVF Aggregation

This has been agreed in FLIP-145 already. We don't need another FLIP but 
someone that finally implements this after we have performed the Calcite 
upgrade a couple of months ago. The Calcite upgrade was important 
exactly for SESSION windows. And actually Sergey Nuyanzin wanted to work 
in this if I remember correctly. Not sure if this is still the case.


2) CDC support of Window TVFs

This can be a FLIP on its own.

3) HOP window size with non-integer step length

This can be a FLIP on its own.

4) Configurations such as early fire, late fire and allow lateness

Can we postpone this discussion? Currently we should focus on user 
switching to Window TVFs before Flink 2.0. Early fire, late fire and 
allow lateness have not exposed through public configuration. It can be 
introduced after Flink 2.0 released.


Regards,
Timo


On 12.12.23 08:01, Xuyang wrote:

Hi, Jim.
Thanks for your explaination.

Ah, I mean to ask if you can contribute the new SESSION Table support
without needing FLIP-392 completely settled.  I was trying to see if that
is separate work which can be done or if there is some dependency on this
FLIP.

The pr available about session window tvf belongs to this flip I think, and is 
part of the work about this flip. Actually the poc pr is not ready completely 
yet,
I'll try to update it to implement the session window tvf in window agg 
operator instead of using legacy group window agg operator.

The tests should not be impacted.  Depending on what order our work lands
in, one of the tests you've added/updated would likely move to the
RestoreTests that Bonnie and I are working on.  Just mentioning that ahead
of time


Got it! I will pay attention to it.




--

 Best!
 Xuyang





在 2023-12-11 21:35:00,"Jim Hughes"  写道:

Hi Xuyang,

On Sun, Dec 10, 2023 at 10:41 PM Xuyang  wrote:


Hi, Jim.

As a clarification, since FLINK-24204 is finishing up work from
FLIP-145[1], do we need to discuss anything before you work out the

details

of FLINK-24024 as a PR?

Which issue do you mean? It seems that FLINK-24204[1] is the issue with
table api type system.



Ah, I mean to ask if you can contribute the new SESSION Table support
without needing FLIP-392 completely settled.  I was trying to see if that
is separate work which can be done or if there is some dependency on this
FLIP.



I've got a PR up [3] for moving at least one of the classes you are
touching.
Nice work! Since we are not going to delete the legacy group window agg
operator actually, the only compatibility issue
may be that when using flink sql, the legacy group window agg operator
will be rewritten into new operators. Will these tests be affected about
this rewritten?



The tests should not be impacted.  Depending on what order our work lands
in, one of the tests you've added/updated would likely move to the
RestoreTests that Bonnie and I are working on.  Just mentioning that ahead
of time

Cheers,

Jim





[1] https://issues.apache.org/jira/browse/FLINK-24204






--

 Best!
 Xuyang





At 2023-12-09 06:25:30, "Jim Hughes"  wrote:

Hi Xuyang,

As a clarification, since FLINK-24204 is finishing up work from
FLIP-145[1], do we need to discuss anything before you work out the

details

of FLINK-24024 as a PR?

Relatedly, as that goes up for a PR, as part of FLINK-33421 [2], Bonnie

and

I are working through migrating some of the JsonPlan Tests and ITCases to
RestoreTests.  I've got a PR up [3] for moving at least one of the classes
you are touching.  Let me know if I can share any details about that work.

Cheers,

Jim

1.


https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-SessionWindows


2. https://issues.apache.org/jira/browse/FLINK-33421
3. https://github.com/apache/flink/pull/23886
https://issues.apache.org/jira/browse/FLINK-33676

On Tue, Nov 28, 2023 at 7:31 AM Xuyang  wrote:


Hi all.
I'd like to start a discussion of FLIP-392: Deprecate the Legacy Group
Window Aggregation.


Although the current Flink SQL Window Aggregation documentation[1]
indicates that the legacy Group Window Aggregation
syntax has been deprecated, the new Window TVF Aggregation syntax has

not

fully covered all of the features of the legacy one.


Compared to Group Window Aggergation, Window TVF Aggergation has several
advantages, such as two-stage optimization,
support for standard GROUPING SET syntax, and so on. However, it needs

to

supplement and enrich the following features.


1. Support for SESSION Window TVF Aggregation
2. Support for consuming CDC stream
3. Support for HOP window size with non-integer step length
4. Support for configurations such as 

[jira] [Created] (FLINK-33784) CatalogStoreFactory can not be configured via StreamExecutionEnvironment

2023-12-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-33784:


 Summary: CatalogStoreFactory can not be configured via 
StreamExecutionEnvironment
 Key: FLINK-33784
 URL: https://issues.apache.org/jira/browse/FLINK-33784
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


The logic in TableEnvironment.create() has well-defined ordering which allows 
to configure most settings via StreamExecutionEnvironment and flink-conf.yaml. 
The discovery of CatalogStoreFactory should be postponed until the final 
configuration is merged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Release Flink 1.18.1

2023-12-08 Thread Timo Walther

Thanks for taking care of this Jing.

+1 to release 1.18.1 for this.

Cheers,
Timo


On 08.12.23 10:00, Benchao Li wrote:

I've merged FLINK-33313 to release-1.18 branch.

Péter Váry  于2023年12月8日周五 16:56写道:


Hi Jing,
Thanks for taking care of this!
+1 (non-binding)
Peter

Sergey Nuyanzin  ezt írta (időpont: 2023. dec. 8., P,
9:36):


Thanks Jing driving it
+1

also +1 to include FLINK-33313 mentioned by Benchao Li

On Fri, Dec 8, 2023 at 9:17 AM Benchao Li  wrote:


Thanks Jing for driving 1.18.1 releasing.

I would like to include FLINK-33313[1] in 1.18.1, it's just a bugfix,
not a blocker, but it's already merged into master, I plan to merge it
to 1.8/1.7 branches today after the CI passes.

[1] https://issues.apache.org/jira/browse/FLINK-33313

Jing Ge  于2023年12月8日周五 16:06写道:


Hi all,

I would like to discuss creating a new 1.18 patch release (1.18.1). The
last 1.18 release is nearly two months old, and since then, 37 tickets

have

been closed [1], of which 6 are blocker/critical [2].  Some of them are
quite important, such as FLINK-33598 [3]

Most urgent and important one is FLINK-33523 [4] and according to the
discussion thread[5] on the ML, 1.18.1 should/must be released asap

after

the breaking change commit has been reverted.

I am not aware of any other unresolved blockers and there are no

in-progress

tickets [6].
Please let me know if there are any issues you'd like to be included in
this release but still not merged.

If the community agrees to create this new patch release, I could
volunteer as the release manager.

Best regards,
Jing

[1]




https://issues.apache.org/jira/browse/FLINK-33567?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.18.1%20%20and%20resolution%20%20!%3D%20%20Unresolved%20order%20by%20priority%20DESC

[2]




https://issues.apache.org/jira/browse/FLINK-33693?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.18.1%20and%20resolution%20%20!%3D%20Unresolved%20%20and%20priority%20in%20(Blocker%2C%20Critical)%20ORDER%20by%20priority%20%20DESC

[3] https://issues.apache.org/jira/browse/FLINK-33598
[4] https://issues.apache.org/jira/browse/FLINK-33523
[5] https://lists.apache.org/thread/m4c879y8mb7hbn2kkjh9h3d8g1jphh3j
[6] https://issues.apache.org/jira/projects/FLINK/versions/12353640
Thanks,




--

Best,
Benchao Li




--
Best regards,
Sergey









Re: SQL return type change from 1.17 to 1.18

2023-12-07 Thread Timo Walther

Hi Peter,

thanks for reaching out to the Flink community. This is indeed a serious 
issue. As the author of the Flink type system, DataType and many related 
utilities I strongly vote for reverting FLINK-33523:


- It changes the Flink type system without a FLIP.
- It breaks backwards compatibility with UDFs and connectors.

Regards,
Timo

On 07.12.23 07:38, Péter Váry wrote:

Hi Team,

We are working on upgrading the Iceberg-Flink connector from 1.17 to 1.18,
and found that some of our tests are failing. Prabhu Joseph created a jira
[1] to discuss this issue, along with short example code.

In a nutshell:
- Create a table with an 'ARRAY' column
- Run a select which returns this column
- The return type changes:
 - From 'Object[]' - in 1.17
 - To 'int[]' - in 1.18

The change is introduced by this jira [2].

While I understand the reasoning behind this change, this will break some
users existing workflow as evidenced by Xingcan Cui finding this
independently [3].

What is the opinion of the community about this change?
- Do we want to revert the change?
- Do we ask the owners of the change to make this behavior configurable?
- Do we accept this behavior change in a minor release?

Thanks,
Peter

[1] - https://issues.apache.org/jira/browse/FLINK-33523 - DataType
ARRAY fails to cast into Object[]
[2] - https://issues.apache.org/jira/browse/FLINK-31835 - DataTypeHint
don't support Row>
[3] - https://issues.apache.org/jira/browse/FLINK-33547 - SQL primitive
array type after upgrading to Flink 1.18.0





[jira] [Created] (FLINK-33666) MergeTableLikeUtil uses different constraint name than Schema

2023-11-27 Thread Timo Walther (Jira)
Timo Walther created FLINK-33666:


 Summary: MergeTableLikeUtil uses different constraint name than 
Schema
 Key: FLINK-33666
 URL: https://issues.apache.org/jira/browse/FLINK-33666
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther


{{MergeTableLikeUtil}} uses a different algorithm to name constraints than 
{{Schema}}. 

{{Schema}} includes the column names.
{{MergeTableLikeUtil}} uses a hashCode which means it might depend on JVM 
specifics.

For consistency we should use the same algorithm. I propose to use {{Schema}}'s 
logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-393: Make QueryOperations SQL serializable

2023-11-21 Thread Timo Walther

+1 (binding)

Thanks for working on this FLIP. It will be a nice continuation of the 
previous work.


Cheers,
Timo


On 21.11.23 13:19, Gyula Fóra wrote:

+1 (binding)

Gyula

On Tue, 21 Nov 2023 at 13:11, xiangyu feng  wrote:


+1 (non-binding)

Thanks for driving this.

Best,
Xiangyu Feng


Ferenc Csaky  于2023年11月21日周二 20:07写道:


+1 (non-binding)

Lookgin forward to this!

Best,
Ferenc




On Tuesday, November 21st, 2023 at 12:21, Martijn Visser <
martijnvis...@apache.org> wrote:





+1 (binding)

Thanks for driving this.

Best regards,

Martijn

On Tue, Nov 21, 2023 at 12:18 PM Benchao Li libenc...@apache.org

wrote:



+1 (binding)

Dawid Wysakowicz wysakowicz.da...@gmail.com 于2023年11月21日周二 18:56写道:


Hi everyone,

Thank you to everyone for the feedback on FLIP-393: Make

QueryOperations

SQL serializable[1]
which has been discussed in this thread [2].

I would like to start a vote for it. The vote will be open for at

least 72

hours unless there is an objection or not enough votes.

[1] https://cwiki.apache.org/confluence/x/vQ2ZE
[2]

https://lists.apache.org/thread/ztyk68brsbmwwo66o1nvk3f6fqqhdxgk


--

Best,
Benchao Li










[RESULT][VOTE] FLIP-376: Add DISTRIBUTED BY clause

2023-11-09 Thread Timo Walther

Hi everyone,

The voting time for [VOTE] FLIP-376: Add DISTRIBUTED BY clause[1] has 
passed. I'm closing the vote now.


There were 10 +1 votes, of which 8 were binding:

- Jing Ge (binding)
- Martijn Visser (binding)
- Lincoln Lee (binding)
- Benchao Li (binding)
- Dawid Wysakowicz (binding)
- Jim Hughes (non-binding)
- Jingsong Li (binding)
- Zhanghao Chen (non-binding)
- Sergey Nuyanzin (binding)
- Leonard Xu (binding)

There were no -1 votes.

Thus, FLIP-376 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

[1] https://lists.apache.org/thread/cft2d9jc2lr8gv6dyyz7b62188mf07sj

Cheers,
Timo


On 08.11.23 10:14, Leonard Xu wrote:

+1(binding)

Best,
Leonard


2023年11月8日 下午1:05,Sergey Nuyanzin  写道:

+1 (binding)

On Wed, Nov 8, 2023 at 6:02 AM Zhanghao Chen 
wrote:


+1 (non-binding)

Best,
Zhanghao Chen

From: Timo Walther 
Sent: Monday, November 6, 2023 19:38
To: dev 
Subject: [VOTE] FLIP-376: Add DISTRIBUTED BY clause

Hi everyone,

I'd like to start a vote on FLIP-376: Add DISTRIBUTED BY clause[1] which
has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
[2] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1

Cheers,
Timo




--
Best regards,
Sergey






[jira] [Created] (FLINK-33497) Update the Kafka connector to support DISTRIBUTED BY clause

2023-11-09 Thread Timo Walther (Jira)
Timo Walther created FLINK-33497:


 Summary: Update the Kafka connector to support DISTRIBUTED BY 
clause
 Key: FLINK-33497
 URL: https://issues.apache.org/jira/browse/FLINK-33497
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Reporter: Timo Walther


The Kafka connector can be one of the first connectors supporting the 
DISTRIBUTED BY clause. The clause can be translated into 'key.fields' and 
'properties.num.partitons' in the WITH clause.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33496) Expose DISTRIBUTED BY clause via parser

2023-11-09 Thread Timo Walther (Jira)
Timo Walther created FLINK-33496:


 Summary: Expose DISTRIBUTED BY clause via parser
 Key: FLINK-33496
 URL: https://issues.apache.org/jira/browse/FLINK-33496
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


Expose DISTRIBUTED BY clause via parser and TableDescriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33495) Add catalog and connector ability API and validation

2023-11-09 Thread Timo Walther (Jira)
Timo Walther created FLINK-33495:


 Summary: Add catalog and connector ability API and validation
 Key: FLINK-33495
 URL: https://issues.apache.org/jira/browse/FLINK-33495
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Add API infra before adjusting the parser:

- CatalogTable
- CatalogTable.Builder
- TableDistribution
- SupportsBucketing

This includes validation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33494) FLIP-376: Add DISTRIBUTED BY clause

2023-11-09 Thread Timo Walther (Jira)
Timo Walther created FLINK-33494:


 Summary: FLIP-376: Add DISTRIBUTED BY clause
 Key: FLINK-33494
 URL: https://issues.apache.org/jira/browse/FLINK-33494
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Timo Walther






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-07 Thread Timo Walther

Thanks for the FLIP, Piotr.

In order to follow the FLIP process[1], please prefix the email subject 
with "[DISCUSS]".


Also, some people might have added filters to their email clients to 
highlight those discussions.


Thanks,
Timo

[1] 
https://cwiki.apache.org/confluence/display/Flink/Flink+Improvement+Proposals#FlinkImprovementProposals-Process


On 07.11.23 09:35, Piotr Nowojski wrote:

Hi all!

I would like to start a discussion on a follow up of FLIP-384: Introduce
TraceReporter and use it to create checkpointing and recovery traces [1]:

*FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter[
2]*

This FLIP proposes to add both MetricReporter and TraceReporter integrating
Flink with OpenTelemetry [4].

There is also another follow up FLIP-386 [3], which improves recovery
traces.

Please let me know what you think!

Best,
Piotr Nowojski

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
[4] https://opentelemetry.io/





[VOTE] FLIP-376: Add DISTRIBUTED BY clause

2023-11-06 Thread Timo Walther

Hi everyone,

I'd like to start a vote on FLIP-376: Add DISTRIBUTED BY clause[1] which 
has been discussed in this thread [2].


The vote will be open for at least 72 hours unless there is an objection 
or not enough votes.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause

[2] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1

Cheers,
Timo


Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-03 Thread Timo Walther

If there are no objections, I would start with a voting on Monday.

Thanks for the feedback everyone!

Regards,
Timo


On 02.11.23 13:49, Martijn Visser wrote:

Hi all,


From a user point of view, I think it makes sense to go for

DISTRIBUTED BY with how Timo explained it. +1 for his proposal

Best regards,

Martijn


On Thu, Nov 2, 2023 at 11:00 AM Timo Walther  wrote:


Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the
programmatic API. But anyway, we should discuss the SQL semantics here.
It's like a "WHERE" is called "filter" in the programmatic world. Or a
"SELECT" is called "projection" in code.

And looking at all the Hive tutorials[1], distributed by should be more
consistent. By using the "INTO n BUCKETS", we still include the
bucketing terminology in the syntax for better understanding.

If there are no other objections to this topic, I would still prefer to
go with DISTRIBUTED BY.

Regards,
Timo

[1]
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/



On 01.11.23 11:55, Jing Ge wrote:

Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:


Hi Jing,

   > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

   > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

  - For advanced users, the algorithm can be defined explicitly.
  - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

> to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I up

[jira] [Created] (FLINK-33447) Avoid CompiledPlan recompilation during loading

2023-11-03 Thread Timo Walther (Jira)
Timo Walther created FLINK-33447:


 Summary: Avoid CompiledPlan recompilation during loading
 Key: FLINK-33447
 URL: https://issues.apache.org/jira/browse/FLINK-33447
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


{{StreamPlanner.loadPlan}} recompiles the loaded plan. This causes unnecessary 
computational overhead and should be removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-02 Thread Timo Walther

Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the 
programmatic API. But anyway, we should discuss the SQL semantics here. 
It's like a "WHERE" is called "filter" in the programmatic world. Or a 
"SELECT" is called "projection" in code.


And looking at all the Hive tutorials[1], distributed by should be more 
consistent. By using the "INTO n BUCKETS", we still include the 
bucketing terminology in the syntax for better understanding.


If there are no other objections to this topic, I would still prefer to 
go with DISTRIBUTED BY.


Regards,
Timo

[1] 
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ 




On 01.11.23 11:55, Jing Ge wrote:

Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:


Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

 - For advanced users, the algorithm can be defined explicitly.
 - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.


Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-31 Thread Timo Walther

Hi Jark,

here are the checks I had in mind so far. But we can also discuss this 
during the implementation in the PRs. Most of the tasks are very similar 
to PARTITIONED BY which is also a characteristic of a sink.


1) Check that DISTRIBUTED BY columns reference physical columns and at 
least 1. In DefaultSchemaResolver like we do for PARTITIONED BY.
2) Check that if DISTRIBUTED is defined the sink implements 
SupportsBucketing. In DynamicSinkUtils like we do for metadata columns.


Currently, for sources we would only check for semantical correctness 
(1) but not more. Like we do for PARTITIONED BY.


Do you have more checks in mind? Of course, during implementation I will 
make sure that all derived utils will properly work; including CREATE 
TABLE LIKE.


Regards,
Timo


On 31.10.23 07:22, Jark Wu wrote:

Hi Timo,

Thank you for the update. The FLIP looks good to me now.
I only have one more question.

What does Flink check and throw exceptions for the bucketing?
For example, do we check interfaces when executing create/alter
DDL and when used as a source?

Best,
Jark

On Tue, 31 Oct 2023 at 00:25, Timo Walther  wrote:


Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

 - For advanced users, the algorithm can be defined explicitly.
 - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor com

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Jing,

> Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this 
concept "distribution".


In any case, the "BY" is optional, so certain DDL statements would 
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, 
we should use the passive voice.


> Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning 
strategies (namely hash and range partitioning) if the connector offers 
more than one.


Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order to
make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned
which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if I
am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING)
DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

- For advanced users, the algorithm can be defined explicitly.
- Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther  wrote:


Let me reply to the feedback from Yunfan:

  > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

  > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]

https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf


Let me reply to the feedback from Benchao:

  > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but
not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER

TABLE?

Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example,
StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:


https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets

[2]:


https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket


On Thu, 26 Oct 2023 at 18:26, Jingsong Li 

wrote:



Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther 

wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This 
discussion is about DDL. For DDL, we have more freedom as every vendor 
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly 
connector to the connector implementation, not the engine. However, for 
DML we need to watch out for standard compliance and introduce changes 
with high caution.


How a LookupTableSource interprets the DISTRIBUTED BY is 
connector-dependent in my opinion. In general this FLIP is a sink 
ability, but we could have a follow FLIP that helps in distributing load 
of lookup joins.


> to avoid data skew problem

I understand the use case and that it is important to solve it 
eventually. Maybe a solution might be to introduce helper Polymorphic 
Table Functions [1] in the future instead of new syntax.


[1] 
https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink 
SQL engine. We are not using Flink's hash strategy in any way. If the 
hash strategy for the regular Flink file system connector should be 
changed, this should be expressed via config option. Otherwise we should 
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.


Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In 
the past years, we could enable use cases also without this clause, so 
we should be careful with overloading it with too functionality in the 
first version. We can still iterate on it later, the interfaces are 
flexible enough to support more in the future.


I agree that maybe an explicit HASH and RANGE doesn't harm. Also making 
the bucket number optional.


I updated the FLIP accordingly. Now the SupportsBucketing interface 
declares more methods that help in validation and proving helpful error 
messages to users.


Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but 
not in

the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, 
StarRocks[1]

and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
    (
  uid BIGINT,
  name STRING
    )
    DISTRIBUTED BY (uid) INTO 6 BUCKETS
    WITH (
  'connector' = 'kafka'
    )

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause









Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Yunfan and Benchao,

it seems the FLIP discussion thread got split into two parts. At least 
this is what I see in my mail program. I would kindly ask to answer in 
the other thread [1].


I will also reply there now to maintain the discussion link.

Regards,
Timo

[1] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1



On 28.10.23 10:34, Benchao Li wrote:

Thanks Timo for preparing the FLIP.

Regarding "By default, DISTRIBUTED BY assumes a list of columns for an
implicit hash partitioning."
Do you think it's useful to add some extensibility for the hash
strategy. One scenario I can foresee is if we write bucketed data into
Hive, and if Flink's hash strategy is different than Hive/Spark's,
then they could not utilize the bucketed data written by Flink. This
is the one case I met in production already, there may be more cases
like this that needs customize the hash strategy to accommodate with
existing systems.

yunfan zhang  于2023年10月27日周五 19:06写道:


Distribute by in DML is also supported by Hive.
And it is also useful for flink.
Users can use this ability to increase cache hit rate in lookup join.
And users can use "distribute by key, rand(1, 10)” to avoid data skew problem.
And I think it is another way to solve this Flip204[1]
There is already has some people required this feature[2]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2] https://issues.apache.org/jira/browse/FLINK-27541

On 2023/10/27 08:20:25 Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
(
  uid BIGINT,
  name STRING
)
DISTRIBUTED BY (uid) INTO 6 BUCKETS
WITH (
  'connector' = 'kafka'
)

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause











Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Jark,

my intention was to avoid too complex syntax in the first version. In 
the past years, we could enable use cases also without this clause, so 
we should be careful with overloading it with too functionality in the 
first version. We can still iterate on it later, the interfaces are 
flexible enough to support more in the future.


I agree that maybe an explicit HASH and RANGE doesn't harm. Also making 
the bucket number optional.


I updated the FLIP accordingly. Now the SupportsBucketing interface 
declares more methods that help in validation and proving helpful error 
messages to users.


Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
(
  uid BIGINT,
  name STRING
)
DISTRIBUTED BY (uid) INTO 6 BUCKETS
WITH (
  'connector' = 'kafka'
)

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause







[DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-26 Thread Timo Walther

Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY 
clause [1].


Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
Clustering. This FLIP continues the work of previous FLIPs and would 
like to introduce the concept of "Bucketing" to Flink.


This is a pure connector characteristic and helps both Apache Kafka and 
Apache Paimon connectors in avoiding a complex WITH clause by providing 
improved syntax.


Here is an example:

CREATE TABLE MyTable
  (
uid BIGINT,
name STRING
  )
  DISTRIBUTED BY (uid) INTO 6 BUCKETS
  WITH (
'connector' = 'kafka'
  )

The full syntax specification can be found in the document. The clause 
should be optional and fully backwards compatible.


Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause


[jira] [Created] (FLINK-33183) Enable metadata columns in NduAnalyzer with retract if non-virtual

2023-10-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-33183:


 Summary: Enable metadata columns in NduAnalyzer with retract if 
non-virtual
 Key: FLINK-33183
 URL: https://issues.apache.org/jira/browse/FLINK-33183
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Currently, the NduAnalyzer is very strict about metadata columns in updating 
sources. Compared to append and upsert sources (see also FLINK-33182), retract 
sources are tricky. And the analyzer is actually correct.

However, for retract sources we should expose more functionality to the user 
and add a warning to the documentation that retract mode could potentially 
cause NDU problems if not enough attention is paid. We should only throw an 
error on virtual metadata columns. Persisted metadata columns can be considered 
“safe“. When a metadata column is persisted, we can assume that an upstream 
Flink job fills its content thus likely also fills its correct retraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-33182:


 Summary: Allow metadata columns in NduAnalyzer with 
ChangelogNormalize
 Key: FLINK-33182
 URL: https://issues.apache.org/jira/browse/FLINK-33182
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Currently, the NduAnalyzer is very strict about metadata columns in updating 
sources. However, for upsert sources (like Kafka) that contain an incomplete 
changelog, the planner always adds a ChangelogNormalize node. 
ChangelogNormalize will make sure that metadata columns can be considered 
deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33169) Window TVFs don't consider column expansion

2023-09-29 Thread Timo Walther (Jira)
Timo Walther created FLINK-33169:


 Summary: Window TVFs don't consider column expansion
 Key: FLINK-33169
 URL: https://issues.apache.org/jira/browse/FLINK-33169
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


Window TVFs don't consider the column expansion. The reason for this is that 
`TABLE t` is expanded by a custom logic in the parser. The expansion logic 
should consider the descriptor in this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-09-05 Thread Timo Walther

Hi everyone,

The voting time for [VOTE] FLIP-348: Make expanding behavior of virtual 
metadata columns configurable[1] has passed. I'm closing the vote now.


There were 6 +1 votes, all were binding:

- Martijn Visser (binding)
- Benchao Li (binding)
- Godfrey He (binding)
- Sergey Nuyanzin (binding)
- Jing Ge (binding)
- Jark Wu (binding)

There were no -1 votes.

Thus, FLIP-348 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

[1] https://lists.apache.org/thread/lc2h6xpk25bmvm4c5pgtfggm920j2ckr

Cheers,
Timo



On 31.08.23 14:29, Jark Wu wrote:

+1 (binding)

Best,
Jark


2023年8月31日 18:54,Jing Ge  写道:

+1(binding)

On Thu, Aug 31, 2023 at 11:22 AM Sergey Nuyanzin 
wrote:


+1 (binding)

On Thu, Aug 31, 2023 at 9:28 AM Benchao Li  wrote:


+1 (binding)

Martijn Visser  于2023年8月31日周四 15:24写道:


+1 (binding)

On Thu, Aug 31, 2023 at 9:09 AM Timo Walther 

wrote:



Hi everyone,

I'd like to start a vote on FLIP-348: Make expanding behavior of

virtual

metadata columns configurable [1] which has been discussed in this
thread [2].

The vote will be open for at least 72 hours unless there is an

objection

or not enough votes.

[1] https://cwiki.apache.org/confluence/x/_o6zDw
[2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy

Cheers,
Timo





--

Best,
Benchao Li




--
Best regards,
Sergey







[jira] [Created] (FLINK-33028) FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-09-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-33028:


 Summary: FLIP-348: Make expanding behavior of virtual metadata 
columns configurable
 Key: FLINK-33028
 URL: https://issues.apache.org/jira/browse/FLINK-33028
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


Many SQL vendors expose additional metadata via so-called "pseudo columns" or 
"system columns" next to the physical columns.

However, those columns should not be selected by default when expanding SELECT 
*.  Also for the sake of backward compatibility. Flink SQL already offers 
pseudo columns next to the physical columns exposed as metadata columns.

This proposal suggests to evolve the existing column design slightly to be more 
useful for platform providers.

https://cwiki.apache.org/confluence/x/_o6zDw




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Timo Walther

Hi everyone,

I'd like to start a vote on FLIP-348: Make expanding behavior of virtual 
metadata columns configurable [1] which has been discussed in this 
thread [2].


The vote will be open for at least 72 hours unless there is an objection 
or not enough votes.


[1] https://cwiki.apache.org/confluence/x/_o6zDw
[2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy

Cheers,
Timo


Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-29 Thread Timo Walther

Thanks everyone for the positive feedback.

I updated the FLIP with the proposed minimal solution:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-348%3A+Make+expanding+behavior+of+virtual+metadata+columns+configurable

I also changed the title to not cause any confusion with the concept of 
system columns.


Now the FLIP only introduces a new ConfigOption and does not specify 
constraints on the naming of metadata columns anymore.


If there are no objections, I would start a voting thread by tomorrow.

Thanks,
Timo


On 29.08.23 00:21, Alexey Leonov-Vendrovskiy wrote:

SGTM

For future reference, responding here:

5) Any operator can introduce system (psedo) columns.


This is clearly out of scope for this FLIP. The implementation effort
would be huge and could introduce a lot of bugs.



  I didn't imply any specific implementation or feature coverage in the
currently proposed FLIP, but rather as a way to describe the semantics of
system columns that those could be (but don't have to) introduced by any
operator.

Thanks,
Alexey


On Tue, Aug 22, 2023 at 2:18 PM Jing Ge  wrote:


Hi Timo,

Your last suggestion sounds good.

Best regards,
Jing

On Mon, Aug 21, 2023 at 4:21 AM Benchao Li  wrote:


It sounds good to me too, that we avoid introducing the concept of

"system

columns" for now.

Timo Walther  于2023年8月18日周五 22:38写道:


Great, I also like my last suggestion as it is even more elegant. I

will

update the FLIP until Monday.

Regards,
Timo

On 17.08.23 13:55, Jark Wu wrote:

Hi Timo,

I'm fine with your latest suggestion that introducing a flag to

control

expanding behavior of metadata virtual columns, but not introducing
any concept of system/pseudo columns for now.

Best,
Jark

On Tue, 15 Aug 2023 at 23:25, Timo Walther 

wrote:



Hi everyone,

I would like to bump this thread up again.

Esp. I would like to hear opinions on my latest suggestions to

simply

use METADATA VIRTUAL as system columns and only introduce a config
option for the SELECT * behavior. Implementation-wise this means

minimal

effort and less new concepts.

Looking forward to any kind of feedback.

Thanks,
Timo

On 07.08.23 12:07, Timo Walther wrote:

Hi everyone,

thanks for the various feedback and lively discussion. Sorry, for

the

late reply as I was on vacation. Let me answer to some of the

topics:


1) Systems columns should not be shown with DESCRIBE statements

This sounds fine to me. I will update the FLIP in the next

iteration.


2) Do you know why most SQL systems do not need any prefix with

their

pseudo column?

Because most systems do not have external catalogs or connectors.

And

also the number of system columns is limited to a handful of

columns.

Flink is more generic and thus more complex. And we have already

the

concepts of metadata columns. We need to be careful with not

overloading

our language.

3) Implementation details

   > how to you plan to implement the "system columns", do we need

to

add

it to `RelNode` level? Or we just need to do it in the
parsing/validating phase?
   > I'm not sure that Calcite's "system column" feature is fully

ready


My plan would be to only modify the parsing/validating phase. I

would

like to avoid additional complexity in planner rules and
connector/catalog interfaces. Metadata columns already support
projection push down and are passed through the stack (via Schema,
ResolvedSchema, SupportsReadableMetadata). Calcite's "system

column"

feature is not fully ready yet and it would be a large effort
potentially introducing bugs in supporting it. Thus, I'm proposing

to

leverage what we already have. The only part that needs to be

modified

is the "expand star" method in SqlValidator and Table API.

Queries such as `SELECT * FROM (SELECT $rowtime, * FROM t);` would

show

$rowtime as the expand star has only a special case when in the

scope

for `FROM t`. All further subqueries treat it as a regular column.

4) Built-in defined pseudo-column "$rowtime"

   > Did you consider making it as a built-in defined pseudo-column
"$rowtime" which returns the time attribute value (if exists) or

null

(if non-exists) for every table/query, and pseudo-column

"$proctime"

always returns PROCTIME() value for each table/query

Built-in pseudo-columns mean that connector or catalog providers

need

consensus in Flink which pseudo-columns should be built-in. We

should

keep the concept generic and let platform providers decide which

pseudo

columns to expose. $rowtime might be obvious but others such as
$partition or $offset are tricky to get consensus as every external
connector works differently. Also a connector might want to expose
different time semantics (such as ingestion time).

5) Any operator can introduce system (psedo) columns.

This is clearly out of scope for this FLIP. The implementation

effort

would be huge and could introduce a lot of bugs.


Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-18 Thread Timo Walther

> lots of the streaming SQL syntax are extensions of SQL standard

That is true. But hints are kind of a special case because they are not 
even "part of Flink SQL" that's why they are written in a comment syntax.


Anyway, I feel hints could be sometimes confusing for users because most 
of them have no effect for streaming and long-term we could also set 
some hints via the CompiledPlan. And if you have multiple teams, 
non-skilled users should not play around with hints and leave the 
decision to the system that might become smarter over time.


Regards,
Timo


On 17.08.23 18:47, liu ron wrote:

Hi, Bonnie


Options hints could be a security concern since users can override

settings.

I think this still doesn't answer my question

Best,
Ron

Jark Wu  于2023年8月17日周四 19:51写道:


Sorry, I still don't understand why we need to disable the query hint.
It doesn't have the security problems as options hint. Bonnie said it
could affect performance, but that depends on users using it explicitly.
If there is any performance problem, users can remove the hint.

If we want to disable query hint just because it's an extension to SQL
standard.
I'm afraid we have to introduce a bunch of configuration, because lots of
the streaming SQL syntax are extensions of SQL standard.

Best,
Jark

On Thu, 17 Aug 2023 at 15:43, Timo Walther  wrote:


+1 for this proposal.

Not every data team would like to enable hints. Also because they are an
extension to the SQL standard. It might also be the case that custom
rules would be overwritten otherwise. Setting hints could also be the
exclusive task of a DevOp team.

Regards,
Timo


On 17.08.23 09:30, Konstantin Knauf wrote:

Hi Bonnie,

this makes sense to me, in particular, given that we already have this
toggle for a different type of hints.

Best,

Konstantin

Am Mi., 16. Aug. 2023 um 19:38 Uhr schrieb Bonnie Arogyam Varghese
:


Hi Liu,
   Options hints could be a security concern since users can override
settings. However, query hints specifically could affect performance.
Since we have a config to disable Options hint, I'm suggesting we also

have

a config to disable Query hints.

On Wed, Aug 16, 2023 at 9:41 AM liu ron  wrote:


Hi,

Thanks for driving this proposal.

Can you explain why you would need to disable query hints because of
security issues? I don't really understand why query hints affects
security.

Best,
Ron

Bonnie Arogyam Varghese 

于2023年8月16日周三

23:59写道:


Platform providers may want to disable hints completely for security
reasons.

Currently, there is a configuration to disable OPTIONS hint -









https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled


However, there is no configuration available to disable QUERY hints

-










https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints


The proposal is to add a new configuration:

Name: table.query-options.enabled
Description: Enable or disable the QUERY hint, if disabled, an
exception would be thrown if any QUERY hints are specified
Note: The default value will be set to true.



















Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-18 Thread Timo Walther
Great, I also like my last suggestion as it is even more elegant. I will 
update the FLIP until Monday.


Regards,
Timo

On 17.08.23 13:55, Jark Wu wrote:

Hi Timo,

I'm fine with your latest suggestion that introducing a flag to control
expanding behavior of metadata virtual columns, but not introducing
any concept of system/pseudo columns for now.

Best,
Jark

On Tue, 15 Aug 2023 at 23:25, Timo Walther  wrote:


Hi everyone,

I would like to bump this thread up again.

Esp. I would like to hear opinions on my latest suggestions to simply
use METADATA VIRTUAL as system columns and only introduce a config
option for the SELECT * behavior. Implementation-wise this means minimal
effort and less new concepts.

Looking forward to any kind of feedback.

Thanks,
Timo

On 07.08.23 12:07, Timo Walther wrote:

Hi everyone,

thanks for the various feedback and lively discussion. Sorry, for the
late reply as I was on vacation. Let me answer to some of the topics:

1) Systems columns should not be shown with DESCRIBE statements

This sounds fine to me. I will update the FLIP in the next iteration.

2) Do you know why most SQL systems do not need any prefix with their
pseudo column?

Because most systems do not have external catalogs or connectors. And
also the number of system columns is limited to a handful of columns.
Flink is more generic and thus more complex. And we have already the
concepts of metadata columns. We need to be careful with not overloading
our language.

3) Implementation details

  > how to you plan to implement the "system columns", do we need to add
it to `RelNode` level? Or we just need to do it in the
parsing/validating phase?
  > I'm not sure that Calcite's "system column" feature is fully ready

My plan would be to only modify the parsing/validating phase. I would
like to avoid additional complexity in planner rules and
connector/catalog interfaces. Metadata columns already support
projection push down and are passed through the stack (via Schema,
ResolvedSchema, SupportsReadableMetadata). Calcite's "system column"
feature is not fully ready yet and it would be a large effort
potentially introducing bugs in supporting it. Thus, I'm proposing to
leverage what we already have. The only part that needs to be modified
is the "expand star" method in SqlValidator and Table API.

Queries such as `SELECT * FROM (SELECT $rowtime, * FROM t);` would show
$rowtime as the expand star has only a special case when in the scope
for `FROM t`. All further subqueries treat it as a regular column.

4) Built-in defined pseudo-column "$rowtime"

  > Did you consider making it as a built-in defined pseudo-column
"$rowtime" which returns the time attribute value (if exists) or null
(if non-exists) for every table/query, and pseudo-column "$proctime"
always returns PROCTIME() value for each table/query

Built-in pseudo-columns mean that connector or catalog providers need
consensus in Flink which pseudo-columns should be built-in. We should
keep the concept generic and let platform providers decide which pseudo
columns to expose. $rowtime might be obvious but others such as
$partition or $offset are tricky to get consensus as every external
connector works differently. Also a connector might want to expose
different time semantics (such as ingestion time).

5) Any operator can introduce system (psedo) columns.

This is clearly out of scope for this FLIP. The implementation effort
would be huge and could introduce a lot of bugs.

6) "Metadata Key Prefix Constraint" which is still a little complex

Another option could be to drop the naming pattern constraint. We could
make it configurable that METADATA VIRTUAL columns are never selected by
default in SELECT * or visible in DESCRIBE. This would further simplify
the FLIP and esp lower the impact on the planner and all interfaces.

What do you think about this? We could introduce a flag:

table.expand-metadata-columns (better name to be defined)

This way we don't need to introduce the concept of system columns yet,
but can still offer similar functionality with minimal overhead in the
code base.

Regards,
Timo




On 04.08.23 23:06, Alexey Leonov-Vendrovskiy wrote:

Looks like both kinds of system columns can converge.
We can say that any operator can introduce system (psedo) columns.

cc Eugene who is also interested in the subject.

On Wed, Aug 2, 2023 at 1:03 AM Paul Lam  wrote:


Hi Timo,

Thanks for starting the discussion! System columns are no doubt a
good boost on Flink SQL’s usability, and I see the feedbacks are
mainly concerns about the accessibility of system columns.

I think most of the concerns could be solved by clarifying the
ownership of the system columns. Different from databases like
Oracle/BigQuery/PG who owns the data/metadata, Flink uses the
data/metadata from external systems. That means Flink could
have 2 kinds of system columns (take ROWID for example

Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-17 Thread Timo Walther

+1 for this proposal.

Not every data team would like to enable hints. Also because they are an 
extension to the SQL standard. It might also be the case that custom 
rules would be overwritten otherwise. Setting hints could also be the 
exclusive task of a DevOp team.


Regards,
Timo


On 17.08.23 09:30, Konstantin Knauf wrote:

Hi Bonnie,

this makes sense to me, in particular, given that we already have this
toggle for a different type of hints.

Best,

Konstantin

Am Mi., 16. Aug. 2023 um 19:38 Uhr schrieb Bonnie Arogyam Varghese
:


Hi Liu,
  Options hints could be a security concern since users can override
settings. However, query hints specifically could affect performance.
Since we have a config to disable Options hint, I'm suggesting we also have
a config to disable Query hints.

On Wed, Aug 16, 2023 at 9:41 AM liu ron  wrote:


Hi,

Thanks for driving this proposal.

Can you explain why you would need to disable query hints because of
security issues? I don't really understand why query hints affects
security.

Best,
Ron

Bonnie Arogyam Varghese  于2023年8月16日周三
23:59写道:


Platform providers may want to disable hints completely for security
reasons.

Currently, there is a configuration to disable OPTIONS hint -





https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled


However, there is no configuration available to disable QUERY hints -





https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints


The proposal is to add a new configuration:

Name: table.query-options.enabled
Description: Enable or disable the QUERY hint, if disabled, an
exception would be thrown if any QUERY hints are specified
Note: The default value will be set to true.












Re: Plans for Schema Evolution in Table API

2023-08-15 Thread Timo Walther

Hi Ashish,

sorry for the late reply. There are currently no concrete plans to 
support schema evolution in Table API. Until recently, Flink version 
evolution was the biggest topic. In the near future we can rediscuss 
query and state evolution in more detail.


Personally, I think we will need either some kind of more flexible data 
type (similar like the JSON type in Postgres) or user-defined types 
(UDT) to ensure a smooth experience.


For now, warming up the state is the only viable solution until internal 
serializers are more flexible.


Regards,
Timo

On 14.08.23 16:55, Ashish Khatkar wrote:

Bumping the thread.

On Fri, Aug 4, 2023 at 12:51 PM Ashish Khatkar  wrote:


Hi all,

We are using flink-1.17.0 table API and RocksDB as backend to provide a
service to our users to run sql queries. The tables are created using the
avro schema and when the schema is changed in a compatible manner i.e
adding a field with default, we are unable to recover the job from the
savepoint. This is mentioned in the flink doc on evolution [1] as well.

Are there any plans to support schema evolution in the table API? Our
current approach involves rebuilding the entire state by discarding the
output and then utilizing that state in the actual job. This is already
done for table-store [2]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store









Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-15 Thread Timo Walther

Hi everyone,

I would like to bump this thread up again.

Esp. I would like to hear opinions on my latest suggestions to simply 
use METADATA VIRTUAL as system columns and only introduce a config 
option for the SELECT * behavior. Implementation-wise this means minimal 
effort and less new concepts.


Looking forward to any kind of feedback.

Thanks,
Timo

On 07.08.23 12:07, Timo Walther wrote:

Hi everyone,

thanks for the various feedback and lively discussion. Sorry, for the 
late reply as I was on vacation. Let me answer to some of the topics:


1) Systems columns should not be shown with DESCRIBE statements

This sounds fine to me. I will update the FLIP in the next iteration.

2) Do you know why most SQL systems do not need any prefix with their 
pseudo column?


Because most systems do not have external catalogs or connectors. And 
also the number of system columns is limited to a handful of columns. 
Flink is more generic and thus more complex. And we have already the 
concepts of metadata columns. We need to be careful with not overloading 
our language.


3) Implementation details

 > how to you plan to implement the "system columns", do we need to add 
it to `RelNode` level? Or we just need to do it in the 
parsing/validating phase?

 > I'm not sure that Calcite's "system column" feature is fully ready

My plan would be to only modify the parsing/validating phase. I would 
like to avoid additional complexity in planner rules and 
connector/catalog interfaces. Metadata columns already support 
projection push down and are passed through the stack (via Schema, 
ResolvedSchema, SupportsReadableMetadata). Calcite's "system column" 
feature is not fully ready yet and it would be a large effort 
potentially introducing bugs in supporting it. Thus, I'm proposing to 
leverage what we already have. The only part that needs to be modified 
is the "expand star" method in SqlValidator and Table API.


Queries such as `SELECT * FROM (SELECT $rowtime, * FROM t);` would show 
$rowtime as the expand star has only a special case when in the scope 
for `FROM t`. All further subqueries treat it as a regular column.


4) Built-in defined pseudo-column "$rowtime"

 > Did you consider making it as a built-in defined pseudo-column 
"$rowtime" which returns the time attribute value (if exists) or null 
(if non-exists) for every table/query, and pseudo-column "$proctime" 
always returns PROCTIME() value for each table/query


Built-in pseudo-columns mean that connector or catalog providers need 
consensus in Flink which pseudo-columns should be built-in. We should 
keep the concept generic and let platform providers decide which pseudo 
columns to expose. $rowtime might be obvious but others such as 
$partition or $offset are tricky to get consensus as every external 
connector works differently. Also a connector might want to expose 
different time semantics (such as ingestion time).


5) Any operator can introduce system (psedo) columns.

This is clearly out of scope for this FLIP. The implementation effort 
would be huge and could introduce a lot of bugs.


6) "Metadata Key Prefix Constraint" which is still a little complex

Another option could be to drop the naming pattern constraint. We could 
make it configurable that METADATA VIRTUAL columns are never selected by 
default in SELECT * or visible in DESCRIBE. This would further simplify 
the FLIP and esp lower the impact on the planner and all interfaces.


What do you think about this? We could introduce a flag:

table.expand-metadata-columns (better name to be defined)

This way we don't need to introduce the concept of system columns yet, 
but can still offer similar functionality with minimal overhead in the 
code base.


Regards,
Timo




On 04.08.23 23:06, Alexey Leonov-Vendrovskiy wrote:

Looks like both kinds of system columns can converge.
We can say that any operator can introduce system (psedo) columns.

cc Eugene who is also interested in the subject.

On Wed, Aug 2, 2023 at 1:03 AM Paul Lam  wrote:


Hi Timo,

Thanks for starting the discussion! System columns are no doubt a
good boost on Flink SQL’s usability, and I see the feedbacks are
mainly concerns about the accessibility of system columns.

I think most of the concerns could be solved by clarifying the
ownership of the system columns. Different from databases like
Oracle/BigQuery/PG who owns the data/metadata, Flink uses the
data/metadata from external systems. That means Flink could
have 2 kinds of system columns (take ROWID for example):

1. system columns provided by external systems via catalogs, such
 as ROWID from the original system.
2. system columns generated by Flink, such as ROWID generated by
 Flink itself.

IIUC, the FLIP is proposing the 1st approach: the catalog defines what
system columns to provide, and Flink treats them as normal columns
with a special naming pattern.

On the oth

Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-07 Thread Timo Walther
tems columns should not be shown with
DESCRIBE statements.

WDYT? Thanks!

Best,
Paul Lam


2023年7月31日 23:54,Jark Wu  写道:

Hi Timo,

Thanks for your proposal. I think this is a nice feature for users and I
prefer option 3.

I only have one concern about the concept of pseudo-column or
system-column,
because this is the first time we introduce it in Flink SQL. The
confusion is similar to the
question of Benchao and Sergey about the propagation of pseudo-column.

 From my understanding, a pseudo-column can be get from an arbitrary

query,

just similar to
ROWNUM in Oracle[1], such as :

SELECT *
FROM (SELECT * FROM employees ORDER BY employee_id)
WHERE ROWNUM < 11;

However, IIUC, the proposed "$rowtime" pseudo-column can only be got from
the physical table
and can't be got from queries even if the query propagates the rowtime
attribute. There was also
a discussion about adding a pseudo-column "_proctime" [2] to make lookup
join easier to use
which can be got from arbitrary queries. That "_proctime" may conflict

with

the proposed
pseudo-column concept.

Did you consider making it as a built-in defined pseudo-column "$rowtime"
which returns the
time attribute value (if exists) or null (if non-exists) for every
table/query, and pseudo-column
"$proctime" always returns PROCTIME() value for each table/query. In this
way, catalogs only need
to provide a default rowtime attribute and users can get it in the same
way. And we don't need
to introduce the contract interface of "Metadata Key Prefix Constraint"
which is still a little complex
for users and devs to understand.

Best,
Jark

[1]:


https://docs.oracle.com/cd/E11882_01/server.112/e41084/pseudocolumns009.htm#SQLRF00255

[2]: https://lists.apache.org/thread/7ln106qxyw8sp7ljq40hs2p1lb1gdwj5




On Fri, 28 Jul 2023 at 06:18, Alexey Leonov-Vendrovskiy <
vendrov...@gmail.com> wrote:



`SELECT * FROM (SELECT $rowtime, * FROM t);`
Am I right that it will show `$rowtime` in output ?



Yes, all explicitly selected columns become a part of the result (and
intermediate) schema, and hence propagate.

On Thu, Jul 27, 2023 at 2:40 PM Alexey Leonov-Vendrovskiy <
vendrov...@gmail.com> wrote:


Thank you, Timo, for starting this FLIP!

I propose the following change:

Remove the requirement that DESCRIBE need to show system columns.


Some concrete vendor specific catalog implementations might prefer this
approach.
Usually the same system columns are available on all (or family) of
tables, and it can be easily captured in the documentation.

For example, BigQuery does exactly this: there, pseudo-columns do not

show

up in the table schema in any place, but can be accessed via reference.

So I propose we:
a) Either we say that DESCRIBE doesn't show system columns,
b) Or leave this vendor-specific / or configurable via flag (if

needed).


Regards,
Alexey

On Thu, Jul 27, 2023 at 3:27 AM Sergey Nuyanzin 
wrote:


Hi Timo,

Thanks for the FLIP.
I also tend to think that Option 3 is better.

I would be also interested in a question mentioned by Benchao Li.
And a similar question about nested queries like
`SELECT * FROM (SELECT $rowtime, * FROM t);`
Am I right that it will show `$rowtime` in output ?


On Thu, Jul 27, 2023 at 6:58 AM Benchao Li 

wrote:



Hi Timo,

Thanks for the FLIP, I also like the idea and option 3 sounds good to

me.


I would like to discuss a case which is not mentioned in the current

FLIP.

How are the "System column"s expressed in intermediate result, e.g.

Join?

E.g. `SELECT * FROM t1 JOIN t2`, I guess it should not include

"system

columns" from t1 and t2 as you proposed, and for `SELECT t1.$rowtime,

*

FROM t1 JOIN t2`, it should also be valid.
Then the question is how to you plan to implement the "system

columns",

do

we need to add it to `RelNode` level? Or we just need to do it in the
parsing/validating phase?
I'm not sure that Calcite's "system column" feature is fully ready

for

this

since the code about this part is imported from the earlier project

before

it gets into Apache, and has not been considered much in the past
development.


Jing Ge  于2023年7月26日周三 00:01写道:


Hi Timo,

Thanks for your proposal. It is a very pragmatic feature. Among all

options

in the FLIP, option 3 is one I prefer too and I'd like to ask some
questions to understand your thoughts.

1. I did some research on pseudo columns, just out of curiosity, do

you

know why most SQL systems do not need any prefix with their pseudo

column?

2. Some platform providers will use ${variable_name} to define their

own

configurations and allow them to be embedded into SQL scripts. Will

there

be any conflict with option 3?

Best regards,
Jing

On Tue, Jul 25, 2023 at 7:00 PM Konstantin Knauf 


wrote:


Hi Timo,

this makes sense to me. Option 3 seems reasonable, too.

Cheers,

Konstantin

Am Di., 25. Juli 2023 um 12:53 Uhr schrieb Timo Walther <
twa

[jira] [Created] (FLINK-32689) Insufficient validation for table.local-time-zone

2023-07-26 Thread Timo Walther (Jira)
Timo Walther created FLINK-32689:


 Summary: Insufficient validation for table.local-time-zone
 Key: FLINK-32689
 URL: https://issues.apache.org/jira/browse/FLINK-32689
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


There are still cases where timezone information is lost silently due to the 
interaction between {{java.util.TimeZone}} and {{java.time.ZoneId}}.

This might be theoretical problem, but I would feel safer if we change the 
check to:
{code}
if 
(!java.util.TimeZone.getTimeZone(zoneId).toZoneId().equals(ZoneId.of(zoneId))) {
   throw new ValidationException(errorMessage);
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-07-25 Thread Timo Walther

Hi everyone,

I would like to start a discussion about introducing the concept of 
"System Columns" in SQL and Table API.


The subject sounds bigger than it actually is. Luckily, Flink SQL 
already exposes the concept of metadata columns. And this proposal is 
just a slight adjustment for how metadata columns can be used as system 
columns.


The biggest problem of metadata columns currently is that a catalog 
implementation can't provide them by default because they would affect 
`SELECT *` when adding another one.


Looking forward to your feedback on FLIP-348:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-348%3A+Support+System+Columns+in+SQL+and+Table+API

Thanks,
Timo


[jira] [Created] (FLINK-32657) Revert upgrading ExecNode versions for StateMetadata

2023-07-24 Thread Timo Walther (Jira)
Timo Walther created FLINK-32657:


 Summary: Revert upgrading ExecNode versions for StateMetadata
 Key: FLINK-32657
 URL: https://issues.apache.org/jira/browse/FLINK-32657
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Jane Chan


In theory, introducing a new attribute in ExecNode requires upgrading the 
version of ExecNodeMetadata. However, since this is currently an experimental 
feature, the attributes that need to be serialized for the final exec node are 
still being iterated, and upgrading the version will make the serialization 
scheme of the lower version become immutable (unless a patch is applied to the 
old version), and the testing framework is not perfect either. Therefore, 
upgrading the ExecNode version is not necessary if state compatibility can be 
maintained at the implementation level.

It should be okay to roll back ExecNodeMetadata to version 1 because 
compatibility handling is enabled at the code level.

Long-term we need a larger testing framework, per-Flink and per-ExecNode 
version that validates all attributes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS][2.0] FLIP-344: Remove parameter in RichFunction#open

2023-07-24 Thread Timo Walther

+1

But instead we should add a OpenContext there to keep the signature 
stable but still be able to add parameters.


Regards,
Timo

On 21.07.23 12:24, Jing Ge wrote:

+1

On Fri, Jul 21, 2023 at 10:22 AM Yuxin Tan  wrote:


+1

Best,
Yuxin


Xintong Song  于2023年7月21日周五 12:04写道:


+1

Best,

Xintong



On Fri, Jul 21, 2023 at 10:52 AM Wencong Liu 

wrote:



Hi devs,

I would like to start a discussion on FLIP-344: Remove parameter in
RichFunction#open [1].

The open() method in RichFunction requires a Configuration instance as

an

argument,
which is always passed as a new instance without any configuration
parameters in
AbstractUdfStreamOperator#open. Thus, it is unnecessary to include this
parameter
in the open() method.
As such I propose to remove the Configuration field from
RichFunction#open(Configuration parameters).
Looking forward to your feedback.
[1]




https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231

Best regards,


Wencong Liu










Re: [VOTE] FLIP-346: Deprecate ManagedTable related APIs

2023-07-24 Thread Timo Walther

+1

Regards,
Timo


On 24.07.23 04:00, liu ron wrote:

+1

Best,
Ron

Lincoln Lee  于2023年7月21日周五 16:09写道:


+1

Best,
Lincoln Lee


Leonard Xu  于2023年7月21日周五 16:07写道:


+1

Best,
Leonard


On Jul 21, 2023, at 4:02 PM, yuxia 

wrote:


+1(binging)

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jane Chan" 
收件人: "dev" 
发送时间: 星期五, 2023年 7 月 21日 下午 3:41:11
主题: [VOTE] FLIP-346: Deprecate ManagedTable related APIs

Hi developers,

Thanks for all the feedback on FLIP-346: Deprecate ManagedTable related
APIs[1].
Based on the discussion[2], we have reached a consensus, so I would

like

to

start a vote.

The vote will last for at least 72 hours unless there is an objection

or

insufficient votes.

[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-346%3A+Deprecate+ManagedTable+related+APIs

[2] https://lists.apache.org/thread/5dvqyhqp5fbtm54944xohts71modwd99

Best,
Jane











[jira] [Created] (FLINK-32613) Disable check for single rowtime attribute for sinks

2023-07-17 Thread Timo Walther (Jira)
Timo Walther created FLINK-32613:


 Summary: Disable check for single rowtime attribute for sinks
 Key: FLINK-32613
 URL: https://issues.apache.org/jira/browse/FLINK-32613
 Project: Flink
  Issue Type: Improvement
Reporter: Timo Walther






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32470) SqlValidateException should be exposed as ValidationException

2023-06-28 Thread Timo Walther (Jira)
Timo Walther created FLINK-32470:


 Summary: SqlValidateException should be exposed as 
ValidationException
 Key: FLINK-32470
 URL: https://issues.apache.org/jira/browse/FLINK-32470
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


{{ValidationException}} is the main exception of the Table API and SQL in case 
the user did something wrong. Most exceptions are wrapped into 
{{ValidationException}}.

Since the parser module has no access to it, it introduces a custom 
{{SqlValidateException}}. However, this should not be exposed to users. It 
should only serve as an intermediate exception that is translated in 
{{FlinkPlannerImpl#validate}}.

{{SqlParserException}} and {{SqlParserEOFException}} could also be simplified 
to {{ValidationException}} but at least they are correctly annotated and 
located in the {{o.a.f.table.api}} package.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32466) Invalid input strategy for CONCAT allows BINARY strings

2023-06-28 Thread Timo Walther (Jira)
Timo Walther created FLINK-32466:


 Summary: Invalid input strategy for CONCAT allows BINARY strings
 Key: FLINK-32466
 URL: https://issues.apache.org/jira/browse/FLINK-32466
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


"string" in SQL terms covers both character strings and binary strings. The 
author of CONCAT might not have known this. In any case, the code gen instead 
of the validator fails when executing:

{code}
TableEnvironment t = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
t.createTemporaryView("t", t.fromValues(lit(new byte[] {97})));
t.executeSql("SELECT CONCAT(f0, '-magic') FROM t").print();
{code}

As future work, we should also allow binary strings.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-25 Thread Timo Walther

Also: How do we want to query the most recent version of a table?

`AS OF CURRENT_TIMESTAMP` would be ideal, but according to the docs both 
the type is TIMESTAMP_LTZ and what is even more concerning is the it 
actually is evalated row-based:


> Returns the current SQL timestamp in the local time zone, the return 
type is TIMESTAMP_LTZ(3). It is evaluated for each record in streaming 
mode. But in batch mode, it is evaluated once as the query starts and 
uses the same result for every row.


This could make it difficult to explain in a join scenario of multiple 
snapshotted tables.


Regards,
Timo


On 25.05.23 12:29, Timo Walther wrote:

Hi Feng,

thanks for proposing this FLIP. It makes a lot of sense to finally 
support querying tables at a specific point in time or hopefully also 
ranges soon. Following time-versioned tables.


Here is some feedback from my side:

1. Syntax

Can you elaborate a bit on the Calcite restrictions?

Does Calcite currently support `AS OF` syntax for this but not `FOR 
SYSTEM_TIME AS OF`?


It would be great to support `AS OF` also for time-versioned joins and 
have a unified and short syntax.


Once a fix is merged in Calcite for this, we can make this available in 
Flink earlier by copying the corresponding classes until the next 
Calcite upgrade is performed.


2. Semantics

How do we interpret the timestamp? In Flink we have 2 timestamp types 
(TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF TIMESTAMP 
'2023-04-27 00:00:00', in which timezone will the timestamp be? We will 
convert it to TIMESTAMP_LTZ?


We definely need to clarify this because the past has shown that 
daylight saving times make our lives hard.


Thanks,
Timo

On 25.05.23 10:57, Feng Jin wrote:

Hi, everyone.

I’d like to start a discussion about FLIP-308: Support Time Travel In 
Batch

Mode [1]


Time travel is a SQL syntax used to query historical versions of data. It
allows users to specify a point in time and retrieve the data and 
schema of

a table as it appeared at that time. With time travel, users can easily
analyze and compare historical versions of data.


With the widespread use of data lake systems such as Paimon, Iceberg, and
Hudi, time travel can provide more convenience for users' data analysis.


Looking forward to your opinions, any suggestions are welcomed.



1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode



Best.

Feng







Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-25 Thread Timo Walther

Hi Feng,

thanks for proposing this FLIP. It makes a lot of sense to finally 
support querying tables at a specific point in time or hopefully also 
ranges soon. Following time-versioned tables.


Here is some feedback from my side:

1. Syntax

Can you elaborate a bit on the Calcite restrictions?

Does Calcite currently support `AS OF` syntax for this but not `FOR 
SYSTEM_TIME AS OF`?


It would be great to support `AS OF` also for time-versioned joins and 
have a unified and short syntax.


Once a fix is merged in Calcite for this, we can make this available in 
Flink earlier by copying the corresponding classes until the next 
Calcite upgrade is performed.


2. Semantics

How do we interpret the timestamp? In Flink we have 2 timestamp types 
(TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF TIMESTAMP 
'2023-04-27 00:00:00', in which timezone will the timestamp be? We will 
convert it to TIMESTAMP_LTZ?


We definely need to clarify this because the past has shown that 
daylight saving times make our lives hard.


Thanks,
Timo

On 25.05.23 10:57, Feng Jin wrote:

Hi, everyone.

I’d like to start a discussion about FLIP-308: Support Time Travel In Batch
Mode [1]


Time travel is a SQL syntax used to query historical versions of data. It
allows users to specify a point in time and retrieve the data and schema of
a table as it appeared at that time. With time travel, users can easily
analyze and compare historical versions of data.


With the widespread use of data lake systems such as Paimon, Iceberg, and
Hudi, time travel can provide more convenience for users' data analysis.


Looking forward to your opinions, any suggestions are welcomed.



1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode



Best.

Feng





Re: [External] [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-04-03 Thread Timo Walther

Hi Jane,

thanks for proposing this FLIP. More state insights and fine-grained 
state TTL are a frequently requested feature for Flink SQL. Eventually, 
we need to address this.


I agree with the previous responses that doing this with a hint might 
cause more confusion than it actually helps. We should use hints only if 
they can be placed close to an operation (e.g. JOIN or table). And only 
where a global flag for the entire query is not sufficient using SET.


In general, I support the current direction of the FLIP and continuing 
the vision of FLIP-190. However, actually fine-grained state TTL should 
already be possible today. Maybe this is untested yet, but we largely 
reworked how configuration works within the planner in Flink 1.15.


As you quickly mentioned in the FLIP, ExecNodeConfig[1] already combines 
configuration coming from TableConfig with per-ExecNode config. 
Actually, state TTL from JSON plan should already have higher precedence 
than TableConfig.


It would be great to extend the meta-information of ExecNodes with state 
insights. I don't fully understand where your proposed StateMetadata is 
located? Would this be a value of @ExecNodeMetadata, StreamExecNode, or 
TwoInputStreamOperator?


I think it should be a combination of ExecNodeMetadata with rough 
estimates (declaration) or StreamExecNode. But should not bubble into 
TwoInputStreamOperator.


Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java


On 03.04.23 09:15, godfrey he wrote:

Hi Jane,

Thanks for driving this FLIP.

I think the compiled plan solution and the hint solution do not
conflict, the two can exist at the same time.
The compiled plan solution can address the need of advanced users and
the platform users
which all stateful operators' state TTL can be defined by user. While
the hint solution can address some
  specific simple scenarios, which is very user-friendly, convenient,
and unambiguous to use.

Some stateful operators are not compiled from SQL directly, such as
ChangelogNormalize and
SinkUpsertMaterializer mentioned above,  I notice the the example given by Yisha
has hints propagation problem which does not conform to the current design.
The rough idea about the hint solution should be simple (only the
common operators are supported)
and easy to understand (no hints propagation).

If the hint solution is supported, a compiled plan which is from a
query with state TTL hints
  can also be further modified for the state TTL parts.

So, I prefer the hint solution to be discuss in a separate FLIP.  I
think that FLIP maybe
need a lot discussion.

Best,
Godfrey

周伊莎  于2023年3月30日周四 22:04写道:


Hi Jane,

Thanks for your detailed response.

You mentioned that there are 10k+ SQL jobs in your production

environment, but only ~100 jobs' migration involves plan editing. Is 10k+
the number of total jobs, or the number of jobs that use stateful
computation and need state migration?



10k is the number of SQL jobs that enable periodic checkpoint. And
surely if users change their sql which result in changes of the plan, they
need to do state migration.

- You mentioned that "A truth that can not be ignored is that users

usually tend to give up editing TTL(or operator ID in our case) instead of
migrating this configuration between their versions of one given job." So
what would users prefer to do if they're reluctant to edit the operator
ID? Would they submit the same SQL as a new job with a higher version to
re-accumulating the state from the earliest offset?



You're exactly right. People will tend to re-accumulate the state from a
given offset by changing the namespace of their checkpoint.
Namespace is an internal concept and restarting the sql job in a new
namespace can be simply understood as submitting a new job.

Back to your suggestions, I noticed that FLIP-190 [3] proposed the

following syntax to perform plan migration



The 'plan migration'  I said in my last reply may be inaccurate.  It's more
like 'query evolution'. In other word, if a user submitted a sql job with a
configured compiled plan, and then
he changes the sql,  the compiled plan changes too, how to move the
configuration in the old plan to the new plan.
IIUC, FLIP-190 aims to solve issues in flink version upgrades and leave out
the 'query evolution' which is a fundamental change to the query. E.g.
adding a filter condition, a different aggregation.
And I'm really looking forward to a solution for query evolution.

And I'm also curious about how to use the hint

approach to cover cases like

- configuring TTL for operators like ChangelogNormalize,
SinkUpsertMaterializer, etc., these operators are derived by the planner
implicitly
- cope with two/multiple input stream operator's state TTL, like join,
and other operations like row_number, rank, correlate, etc.



  Actually, in our company , we make operators in the query 

Re: [DISCUSS] FLIP-296: Watermark options for table API & SQL

2023-03-09 Thread Timo Walther
e need to

be

careful which strategies we offer by default.


@Jark @Timo I'm sorry, perhaps I don't understand what are your

concerns

about CompiledPlan, maybe I missed something else, maybe you can look

at

my

POC first to see if there is somewhere to worry about.


Sorry, I forgot to remind you that Timo's concern about the changes

to

the

CompiledPlan looks like is still not covered in the FLIP.


@Jing We could have more discussion about naming, but I prefer that

the

naming should be consistent with the DataStream API.
About aligning splits/partitions/shards, maybe you missed FLIP-217[2]

which

aims to support watermark alignment of source splits.


After reading the most up-to-date Flip, I didn't find any

information

if

this solution will support aligning splits/partitions/shards [1].

Did I

miss anything?


Best
Kui Yuan

[1] the POC:
https://github.com/yuchengxin/flink/tree/yuankui/watermark_params
[2] FLIP-217:







https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits



Jing Ge  于2023年3月3日周五 08:03写道:


Hi,

Thanks Kui for driving this Flip and thanks all for the informative
discussion.

@Timo

Your suggestion about the naming convention is excellent. Thanks! I

was

wondering why you, exceptionally, suggested 'scan.idle-timeout'

instead

of

'scan.watermark.idle-timeout'. I must miss something here.

There is one more NIT. I am just aware that "drift" is used for the
watermark alignment. It seems to be fine while using DataStream

API,

because we will not really see it. But with the OPTIONS in SQL, a

much

bigger group of users (including SRE, tech support, etc) will see

the

word

"drift". Given that "drift" wasn't used widely yet and with all

training

materials, Flink doc [1][2][3] (search with "lag"), "lag" has been

used

to

describe timestamp difference between watermark and its
corresponding event. Do we really need to introduce another term

for

the

same thing? How about using

'scan.watermark.alignment.max-lag'='1min'

and

change the parameter name from maxAllowedWatermarkDrift to
maxAllowedWatermarkLag [4] because of naming consistency? Just my

two

cents

worth.

@Kui

After reading the most up-to-date Flip, I didn't find any

information

if

this solution will support aligning splits/partitions/shards [1].

Did I

miss anything?

+1 for the concern about Table API. We'd be better keep Table API

and

SQL

synced for new features.

Best regards,
Jing


[1]









https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_

[2]









https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/event-time/built_in/#fixed-amount-of-lateness


[3]









https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/

[4]









https://github.com/apache/flink/blob/4aacff572a9e3996c5dee9273638831e4040c767/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L169




On Wed, Mar 1, 2023 at 3:54 PM Timo Walther 

wrote:



Reg. 2:
  > event gap emit strategy [...] no matter in DataStream or SQL

Jark raised a very good point. I thought we only expose what is
contained in DataStream API already. If this strategy is not part

of

DataStream API, would like to exclude it from the FLIP. We need

to

be

careful which strategies we offer by default.

Reg 1:
This already has a JIRA ticket with additional thoughts on this

topic:

https://issues.apache.org/jira/browse/FLINK-25221

Regards,
Timo



On 01.03.23 12:31, Jark Wu wrote:

Sorry, I forgot to remind you that Timo's concern about the

changes

to

the

CompiledPlan looks like is still not covered in the FLIP.

Best,
Jark

On Wed, 1 Mar 2023 at 19:28, Jark Wu  wrote:


Hi Kui,

Thank you for the great proposal, I think this is already in a

good

shape.


Just a kind reminder, according to the community

guidelines[1],

if there are unresponsive reviewers, a typical reasonable time
to wait for responses is one week, but be pragmatic about it.

Regarding the FLIP, I have some comments below:

1. IIRC, this is the first time we introduce the

framework-level

connector

options that the option is not recognized and handled by

connectors.

The FLIP should cover how framework filters the watermark

related

options

to avoid discover connector factory failed, and what happens

if

the

connector
already supported the conflict options.

2. I'm not sure about the usage scenarios of event gap emit

strategy.

Do

you have any specific use case of this strategy? I'm confused

why

no

one

requested this strategy before no matter in DataStream or SQL,

but

maybe

I missed something. I'm not against to add this option, but

just

want

to

be
careful when adding new API because it's hard to remove in the

future.



3. Adding a "Public Interface"[

Re: [DISCUSS] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-09 Thread Timo Walther

Hi Lincoln,

thanks for proposing the FLIP. The general idea to expose the target 
columns in DynamicTableSink#Context sounds good to me.


In the FLIP I found the JavaDoc a bit confusing:

```
The column list will be empty for 'insert into target select ...'.
```

This could mean both optional empty or array empty. Maybe you can 
rephrase that a bit in the implementation.


Otherwise +1.

Timo


On 08.03.23 14:00, Lincoln Lee wrote:

Hi Jing,
Agree with you that using formal terms can be easier to users, I've updated
the FLIP[1], since this is only one of the application scenarios for
partial insert, our java doc for the corresponding interface will describe
the partial insert message itself from a generic point of view, WDTY?

@Jacky thanks for your feedback!
here are my thoughts for the two questions:
for this scenario, I don't think the planner should report an error. We
cannot assume that such usage will necessarily result in errors or that
users are unaware of potential risks (just like in a database, similar
operations are not prompted with errors). In the streaming scenario,
regarding the risks associated with the multi-insert operation with
overlapping fields, we may consider expanding the plan advice (FLIP-280 has
just added possibilities to support this) to prompt users instead of
rejecting the operation with an error.

1. if the two insert into with same columns, the result is not

nondeterminism. will it check in planner and throw exception

yes, not all connectors support partial insert. Therefore, the introduction
of this interface is only intended as additional information for the
connectors that need it. The new `targetColumns` only provide the column
list information corresponding to the statement according to the SQL
standard, and existing connectors do not need to make any passive changes
by default.

2. some sink connectors can not supports it like queue such as kafka

compacted topic. will also it check in planner  and throw exception

welcome your feedback!


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081


Best,
Lincoln Lee


Jacky Lau  于2023年3月8日周三 20:11写道:


Thanks for bringing this up. this is a good feature. but i have two
questions:
1. if the two insert into with same columns, the result is
not  nondeterminism. will it check in planner and throw exception
2. some sink connectors can not supports it like queue such as kafka
compacted topic. will also it check in planner  and throw exception

Lincoln Lee  于2023年3月7日周二 14:53写道:


Hi Aitozi,

Thanks for your feedback!  Yes, including HBase and JDBC connector, they
can be considered for support in the next step (JDBC as as a standard
protocol supported not only in traditional databases, but also in more

and

more new types of storage). Considering the ongoing externalizing of
connectors and the release cycles of the connectors are decoupled with

the

release cycle of Flink, we can initiate corresponding support issues for
specific connectors to follow up on support after finalizing the API
changes, WDYT?

Best,
Lincoln Lee


Hang Ruan  于2023年3月7日周二 12:14写道:


Hi, Lincoln,

Thanks for bringing this up. It looks good to me. I also agree with
Jingsong's suggestion.

Best,
Hang

Jingsong Li  于2023年3月7日周二 11:15写道:


Wow, we have 300 FLIPs...

Thanks Lincoln,

Have you considered returning an Optional?

Empty array looks a little weird to me.

Best,
Jingsong

On Tue, Mar 7, 2023 at 10:32 AM Aitozi  wrote:


Hi Lincoln,
 Thank you for sharing this FLIP. Overall, it looks good to me.

I

have

one question: with the introduction of this interface,
will any existing Flink connectors need to be updated in order to

take

advantage of its capabilities? For example, HBase.

yuxia  于2023年3月7日周二 10:01写道:


Thanks. It makes sense to me.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Lincoln Lee" 
收件人: "dev" 
发送时间: 星期一, 2023年 3 月 06日 下午 10:26:26
主题: Re: [DISCUSS] FLIP-300: Add targetColumns to

DynamicTableSink#Context

to solve the null overwrite problem of partial-insert

hi yuxia,

Thanks for your feedback and tracking the issue of update

statement!

I've

updated the FLIP[1] and also the poc[2].
Since the bug and flip are orthogonal, we can focus on finalizing

the

api

changes first, and then work on the flip implementation and

bugfix

separately, WDYT?

[1]








https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081

[2] https://github.com/apache/flink/pull/22041

Best,
Lincoln Lee


yuxia  于2023年3月6日周一 21:21写道:


Hi, Lincoln.
Thanks for bringing this up. +1 for this FLIP, it's helpful for

external

storage system to implement partial update.
The FLIP looks good to me. I only want to add one comment,

update

statement also doesn't support updating nested column, I have

created

FLINK-31344[1] to track it.
Maybe we also need to explain it in this FLIP.

[1] https://issues.apache.org/jira/browse/FLINK-31344

Best regards,
Yuxia

- 原始邮件 -
发件人: "Lincoln Lee" 
收件人: "dev" 

Re: [VOTE] FLIP-297: Improve Auxiliary Sql Statements

2023-03-09 Thread Timo Walther

+1 (binding)

Thanks,
Timo


On 08.03.23 14:59, Sergey Nuyanzin wrote:

+1
(binding) i guess based on [1], please correct me if i am wrong
[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Actions


On Wed, Mar 8, 2023 at 8:04 AM yuxia  wrote:


+1 (non-binding)
Thanks Ran Tao for driving it.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jark Wu" 
收件人: "dev" 
发送时间: 星期二, 2023年 3 月 07日 下午 8:50:05
主题: Re: [VOTE] FLIP-297: Improve Auxiliary Sql Statements

+1 (binding)

Best,
Jark


2023年3月7日 17:07,Jing Ge  写道:

+1
Thanks!

Best regards,
Jing

On Tue, Mar 7, 2023 at 9:51 AM ConradJam  wrote:


+1 (non-binding).

Hang Ruan  于2023年3月7日周二 16:14写道:


Thanks for Ran's work.
+1 (non-binding).

Best,
Hang

Ran Tao  于2023年3月7日周二 15:59写道:


thanks Lau.

The vote will last for at least 72 hours (03/09, 19:30 UTC+8).
It needs consensus approval, requiring 3 binding +1 votes and no
binding vetoes.


Best Regards,
Ran Tao


Jacky Lau  于2023年3月7日周二 15:11写道:


Thanks Ran.
+1 (non-binding)

Regards,
Jacky Lau

Ran Tao  于2023年3月6日周一 19:32写道:


Hi Everyone,


I want to start the vote on FLIP-297: Improve Auxiliary Sql

Statements

[1].

The FLIP was discussed in this thread [2].


The goal of the FLIP is to improve flink auxiliary sql

statements(compared

with sql standard or other mature engines).

The vote will last for at least 72 hours (03/09, 19:30 UTC+8)
unless there is an objection or insufficient votes. Thank you all.

[1]











https://cwiki.apache.org/confluence/display/FLINK/FLIP-297%3A+Improve+Auxiliary+Sql+Statements

[2]

https://lists.apache.org/thread/54fyd27m8on1cf3hn6dz564zqmkobjyd


Best Regards,
Ran Tao
https://github.com/chucheng92










--
Best

ConradJam










Re: [DISCUSS] FLIP-296: Watermark options for table API & SQL

2023-03-01 Thread Timo Walther

Reg. 2:
> event gap emit strategy [...] no matter in DataStream or SQL

Jark raised a very good point. I thought we only expose what is 
contained in DataStream API already. If this strategy is not part of 
DataStream API, would like to exclude it from the FLIP. We need to be 
careful which strategies we offer by default.


Reg 1:
This already has a JIRA ticket with additional thoughts on this topic:
https://issues.apache.org/jira/browse/FLINK-25221

Regards,
Timo



On 01.03.23 12:31, Jark Wu wrote:

Sorry, I forgot to remind you that Timo's concern about the changes to the
CompiledPlan looks like is still not covered in the FLIP.

Best,
Jark

On Wed, 1 Mar 2023 at 19:28, Jark Wu  wrote:


Hi Kui,

Thank you for the great proposal, I think this is already in a good shape.

Just a kind reminder, according to the community guidelines[1],
if there are unresponsive reviewers, a typical reasonable time
to wait for responses is one week, but be pragmatic about it.

Regarding the FLIP, I have some comments below:

1. IIRC, this is the first time we introduce the framework-level connector
options that the option is not recognized and handled by connectors.
The FLIP should cover how framework filters the watermark related options
to avoid discover connector factory failed, and what happens if the
connector
already supported the conflict options.

2. I'm not sure about the usage scenarios of event gap emit strategy. Do
you have any specific use case of this strategy? I'm confused why no one
requested this strategy before no matter in DataStream or SQL, but maybe
I missed something. I'm not against to add this option, but just want to
be
careful when adding new API because it's hard to remove in the future.


3. Adding a "Public Interface"[2] section to summarize the
proposed APIs and options would be better for developers to
know the impact. Currently, the APIs are scattered in the long
design sections.

Best,
Jark


[1]:
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
[2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template

On Wed, 1 Mar 2023 at 16:56, Kui Yuan  wrote:


Hi all,

Thanks for all discussions!

Anyone else have questions or suggestions? if not, I will start a vote
thread later.

Best
Kui Yuan

kui yuan  于2023年2月27日周一 20:21写道:


Hi Timo,

Thanks for your advice. I totally agree with your suggestion of naming
convention, I will rename these options and update the flip later,

thanks

very much.

In our internal implementation we had put these options inside the
`FactoryUtil`, just as you expect.  We have also taken into account the
changes to the CompiledPlan and we have packaged these options
appropriately to minimize intrusiveness and ensure the compatibility to

the

`WatermarkPushDownSpec`.


A hint to the implementation: I would suggest that we add those

options

to `FactoryUtil`. All cross-connector options should end up there.




Please also consider the changes to the CompiledPlan in your FLIP.

This

change has implications on the JSON format as watermark strategy of
ExecNode becomes more complex, see WatermarkPushDownSpec


Best
Kui Yuan

Timo Walther  于2023年2月27日周一 18:05写道:


Hi Kui Yuan,

thanks for working on this FLIP. Let me also give some comments about
the proposed changes.

I support the direction of this FLIP about handling these
watermark-specific properties through options and /*+OPTIONS(...) */
hints.

Regarding naming, I would like to keep the options in sync with

existing

options:

  > 'watermark.emit.strategy'='ON_EVENT'

Let's use lower case (e.g. `on-event`) that matches with properties

like

sink.partitioner [1] or sink.delivery-guarantee [2].

  > 'source.idle-timeout'='1min'

According to FLIP-122 [3], we want to prefix all scan-source related
properties with `scan.*`. This clearly includes idle-timeout and
actually also watermark strategies which don't apply for lookup

sources.


Summarizing the comments above, we should use the following options:

'scan.watermark.emit.strategy'='on-event',
'scan.watermark.emit.on-event.gap'='1',
'scan.idle-timeout'='1min',
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s'

I know that this makes the keys even longer, but given that those
options are for power users this should be acceptable. It also clearly
indicates which options are for sinks, scans, and lookups. This
potentially also helps in allow lists.

A hint to the implementation: I would suggest that we add those options
to `FactoryUtil`. All cross-connector options should end up there.

Please also consider the changes to the CompiledPlan in your FLIP. This
change has implications on the JSON format as watermark strategy of
ExecNode becomes more complex, see WatermarkPushDownSpec [4].

Regards,
Timo


[1]



https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#sink-partit

Re: [DISCUSS] FLIP-297: Improve Auxiliary Sql Statements

2023-02-28 Thread Timo Walther

Hi Ran Tao,

Thanks for working on this FLIP. The FLIP is in a pretty good shape 
already and I don't have much to add.


Will we also support ILIKE in queries? Or is this a pure DDL 
expressions? For consistency, we should support it in SELECT and Table 
API as well. I hope this is not too much effort. I hope everybody is 
aware that ILIKE is not standard compliant but seems to be used by a 
variety of vendors.


> Because it may be modified under discuss, we put it on the google 
docs. please see FLIP-297: Improve Auxiliary Sql Statements Docs


This comment confused me. It would be nice to have the Wiki page as the 
single source of truth and abandon the Google doc. In the past we used 
Google Docs more but nowadays I support using only the Wiki to avoid any 
confusion.


Regards,
Timo


On 28.02.23 12:51, Ran Tao wrote:

thanks Sergey, sounds good.
You can add in FLIP ticket[1].

[1] https://issues.apache.org/jira/browse/FLINK-31256

Best Regards,
Ran Tao
https://github.com/chucheng92


Sergey Nuyanzin  于2023年2月28日周二 19:44写道:


Currently I think we can load from the jar and check the services file to
get the connector type. but is it necessary we may continue to discuss.
Hi, Sergey, WDYT?


Another idea is FactoryUtil#discoverFactories and
check if it implements DynamicTableSourceFactory or DynamicTableSinkFactory
with versions it could be trickier...
Moreover it seems the version could be a part of the name sometimes[1].
I think name and type could be enough or please correct me if I'm wrong


or can we open a single ticket under this FLIP?

I have a relatively old jira issue[2] for showing connectors with a poc pr.
Could I propose to move this jira issue as a subtask under the FLIP one and
revive it?

[1]

https://github.com/apache/flink/blob/161014149e803bfd1d3653badb230b2ed36ce3cb/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java#L65-L69
[2] https://issues.apache.org/jira/browse/FLINK-25788

On Tue, Feb 28, 2023 at 11:56 AM Ran Tao  wrote:


Hi, Jark. thanks.

About ILIKE

I have updated the FLIP for ILIKE support (Including existing showTables

&

showColumns how to change).


About show connectors @Sergey,

Currently I think we can load from the jar and check the services file to
get the connector type. but is it necessary we may continue to discuss.
Hi, Sergey, WDYT?or can we open a single ticket under this FLIP?


Best Regards,
Ran Tao


Jark Wu  于2023年2月28日周二 17:45写道:


Besides, if we introduce the ILIKE, we should also add this feature for
the previous SHOW with LIKE statements. They should be included in this
FLIP.

Best,
Jark


2023年2月28日 17:40,Jark Wu  写道:

Hi Ran,

Could you add descriptions about what’s the behavior and differences

between the LIKE and ILIKE?


Besides, I don’t see the SHOW CONNECTOR syntax and description and

how

it works in the FLIP. Is it intended to be included in this FLIP?


Best,
Jark



2023年2月28日 10:58,Ran Tao  写道:

Hi, guys. thanks for advices.

allow me to make a small summary:

1.Support ILIKE
2.Using catalog api to support show operations
3.Need a dedicated FLIP try to support INFORMATION_SCHEMA
4.Support SHOW CONNECTORS

If there are no other questions, i will try to start a VOTE for this

FLIP.

WDYT?

Best Regards,
Ran Tao


Sergey Nuyanzin  于2023年2月27日周一 21:12写道:


Hi Jark,

thanks for your comment.


Considering they
are orthogonal and information schema requires more complex design

and

discussion, it deserves a separate FLIP

I'm ok with a separate FLIP for INFORMATION_SCHEMA.


Sergey, are you willing to contribute this FLIP?

Seems I need to have more research done for that.
I would try to help/contribute here


On Mon, Feb 27, 2023 at 3:46 AM Ran Tao 

wrote:



HI, Jing. thanks.

@about ILIKE, from my collections of some popular engines founds

that

just

snowflake has this syntax in show with filtering.
do we need to support it? if yes, then current some existed show

operations

need to be addressed either.
@about ShowOperation with like. it's a good idea. yes, two

parameters

for

constructor can work. thanks for your advice.


Best Regards,
Ran Tao


Jing Ge  于2023年2月27日周一 06:29写道:


Hi,

@Aitozi

This is exactly why LoD has been introduced: to avoid exposing

internal

structure(2nd and lower level API).

@Jark

IMHO, there is no conflict between LoD and "High power-to-weight

ratio"

with the given example, List.subList() returns List interface

itself,

no

internal or further interface has been exposed. After offering
tEvn.getCatalog(), "all" methods in Catalog Interface have been

provided

by

TableEnvironment(via getCatalog()). From user's perspective and

maintenance

perspective there is no/less difference between providing them

directly

via

TableEnvironment or via getCatalog(). They are all exposed. Using
getCatalog() will reduce the number of boring wrapper methods,

but

on

the

other hand not every method in Catalog needs to be exposed, so

the

number

of wrapper methods would 

Re: [DISCUSS] FLIP-296: Watermark options for table API & SQL

2023-02-27 Thread Timo Walther

Hi Kui Yuan,

thanks for working on this FLIP. Let me also give some comments about 
the proposed changes.


I support the direction of this FLIP about handling these 
watermark-specific properties through options and /*+OPTIONS(...) */ hints.


Regarding naming, I would like to keep the options in sync with existing 
options:


> 'watermark.emit.strategy'='ON_EVENT'

Let's use lower case (e.g. `on-event`) that matches with properties like 
sink.partitioner [1] or sink.delivery-guarantee [2].


> 'source.idle-timeout'='1min'

According to FLIP-122 [3], we want to prefix all scan-source related 
properties with `scan.*`. This clearly includes idle-timeout and 
actually also watermark strategies which don't apply for lookup sources.


Summarizing the comments above, we should use the following options:

'scan.watermark.emit.strategy'='on-event',
'scan.watermark.emit.on-event.gap'='1',
'scan.idle-timeout'='1min',
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s'

I know that this makes the keys even longer, but given that those 
options are for power users this should be acceptable. It also clearly 
indicates which options are for sinks, scans, and lookups. This 
potentially also helps in allow lists.


A hint to the implementation: I would suggest that we add those options 
to `FactoryUtil`. All cross-connector options should end up there.


Please also consider the changes to the CompiledPlan in your FLIP. This 
change has implications on the JSON format as watermark strategy of 
ExecNode becomes more complex, see WatermarkPushDownSpec [4].


Regards,
Timo


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#sink-partitioner
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/#sink-delivery-guarantee
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[4] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java



On 24.02.23 04:55, kui yuan wrote:

Hi all,

I have updated the flip according to the discussion, and we will extend the
watermark-related features with both table options and 'OPTIONS' hint, like
this:

```
-- configure in table options
CREATE TABLE user_actions (
   ...
   user_action_time TIMESTAMP(3),
   WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
   'watermark.emit.strategy'='ON_PERIODIC',
   ...
);

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('watermark.emit.strategy'=
'ON_PERIODIC') */
```

Does everybody have any other questions?

Best
Kui Yuan

kui yuan  于2023年2月23日周四 20:05写道:


Hi all,

Thanks for all suggestions.

We will extend the watermark-related features in SQL layer with dynamic
table options and 'OPTIONS' hint, just as everyone expects. I will modify
Flip-296 as discussed.

@Martijn As far as I know, there is no hint interface in the table API,
so we can't use hint in table API directly. if we need to extend the hint
interface in the table API, maybe we need another flip. However, if we
extend the watermark-related features in the dynamic table options, maybe
we are able to use them indirectly in the table API like this[1]:

```
// register a table named "Orders"
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING,
amount INT) WITH ('watermark.emit.strategy'='ON_EVENT'...)");
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/

Best
Kui Yuan

Yun Tang  于2023年2月23日周四 17:46写道:


Thanks for the warm discussions!

I had an offline discussion with Kui about the replies. I think I could
give some explanations on the original intention to introduce another
WATERMARK_PARAMS. If we take a look at the current datastream API, the
watermark strategy does not belong to any specific connector. And we
thought the dynamic table options were more like the configurations within
some specific connector.

 From the review comments, I think most people feel good to make it part
of the dynamic table options. I think this is fine if we give more clear
scope definition of the dynamic table options here. And I also agree with
Jingsong's concern about adding SQL syntax which is the most concerning
part before launching this discussion.

For Martijn's concern, if we accept to make the watermark-related options
part of dynamic table options, the problem becomes another topic: how to
support the dynamic table options in table API, which is deserved to create
another FLIP.

Best
Yun Tang

From: Martijn Visser 
Sent: Thursday, February 23, 2023 17:14
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-296: Watermark options for table API & SQL

Hi,

While I can understand that there's a desire to first focus on 

Re: [DISCUSS] Proposal to improve Auxiliary Sql Statements

2023-02-21 Thread Timo Walther

Hi Ran,

adding additional filter and selection capabilities makes sense. 
However, I would recommend to not only check Spark or Hive but also 
focus on bigger commercial players such as Snowflake, Oracle, SQL Server.


In any case, maybe we should also investigate more thoughts on a Flink 
INFORMATION_SCHEMA as the SQL standard defines it. Having a queryable 
table that contains metadata on which we can run arbitrary SQL logic not 
only filtering but aggregations like counting or projections.


This would avoid touching the parser everytime and introduce more and 
more SQL keywords.


What do you think?

Regards,
Timo

On 21.02.23 14:32, Martijn Visser wrote:

Hi Ran,

When you're creating your FLIP, please also have a look at Calcite SQL
parser, since that's used by Flink [1]

Best regards,

Martijn

[1] https://calcite.apache.org/docs/reference.html

On Tue, Feb 21, 2023 at 2:10 PM Ran Tao  wrote:


Yes, I totally agree with your point. The current flink sql syntax is tiled
together without classification.
Let me try to summarize these four categories, not just Auxiliary
Statements. And compare it with other popular engines.

Thank you, Jing !

Best Regards,
Ran Tao
https://github.com/chucheng92


Jing Ge  于2023年2月21日周二 20:50写道:


+ 1 for a new FLIP

It would be great if we could take this opportunity to go through all
current Flink SQL syntax and define a feasible classification in the

FLIP.

In my opinion, the 4 categories, i.e. DDL, DML, DQL, DAL ,  used in Spark
doc [1] make sense.
We will then have a better big picture to know the gap between Flink and
other big data engines like Spark and what should be done.

Best regards,
Jing

[1]



https://spark.apache.org/docs/latest/sql-ref-syntax.html#auxiliary-statements


On Tue, Feb 21, 2023 at 1:15 PM Ran Tao  wrote:


Thanks. I will create a FLIP to illustrate some details.


Best Regards,
Ran Tao
https://github.com/chucheng92


Jark Wu  于2023年2月21日周二 20:03写道:


Thank you,

I think this is worth a FLIP design doc to discuss the detailed

syntax.

Could you prepare a FLIP for that?

Best,
Jark

On Tue, 21 Feb 2023 at 19:37, Ran Tao  wrote:


Hi, Jark. thanks. I have added a google doc.










https://docs.google.com/document/d/1hAiOfPx14VTBTOlpyxG7FA2mB1k5M31VnKYad2XpJ1I/edit?usp=sharing


Jark Wu  于2023年2月21日周二 19:27写道:


Hi Ran,

I think it’s always nice to support new syntax if it’s useful for

the

users.
 From my side, your syntax table is broken. Could you share it

with

a

Google doc or create a JIRA issue?

Best,
Jark




2023年2月21日 17:51,Ran Tao  写道:

Hi guys. When I recently used flink sql to manage internal

metadata

(catalog/databases/table/functions),
I found that many current flink sql statements do not support

filtering

or

some advanced syntax, however these abilities are very useful

to

end-users.


These are some statements I have collected so far, which are

supported

on

other big data engines, such as spark, hive or presto. I wonder

if

we

can

support these abilities?

In addition, the subject of this email is named 'Auxiliary

Statements'

mainly because the alignment of these statements will not have

much

impact

on the core SQL runtime.

Support or Not

With Advanced Syntax Or Not (in/from or like)

show create table

Yes

Yes

show tables

Yes

Yes

show columns

Yes

Yes

show catalogs

Yes

without filter

show databases

Yes

without filter

show functions

Yes

without filter

show views

Yes

without filter

show modules

Yes

without filter

show jars

Yes

without filter

show jobs

Yes

without filter

We can see current flink many sql statements only support

showing

with

full

datas, without 'FROM/IN' or 'LIKE' filter clause.

Support or Not

describe database

No

describe table

Yes

describe function

No

describe query

No

current flink only supports describing tables.

Also, please let me know if there is a mistake. Looking forward

to

your

reply.


Best Regards,
Ran Tao
https://github.com/chucheng92



















[ANNOUNCE] New Apache Flink Committer - Sergey Nuyanzin

2023-02-21 Thread Timo Walther

Hi everyone,

On behalf of the PMC, I'm very happy to announce Sergey Nuyanzin as a 
new Flink Committer.


Sergey started contributing small improvements to the project in 2018. 
Over the past 1.5 years, he has become more active and focused on adding 
and reviewing changes to the Flink SQL ecosystem.


Currently, he is upgrading Flink's SQL engine to the latest Apache 
Calcite version [1][2][3] and helps in updating other project-wide 
dependencies as well.


Please join me in congratulating Sergey Nuyanzin for becoming a Flink 
Committer!


Best,
Timo Walther (on behalf of the Flink PMC)

[1] https://issues.apache.org/jira/browse/FLINK-29932
[2] https://issues.apache.org/jira/browse/FLINK-21239
[3] https://issues.apache.org/jira/browse/FLINK-20873


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-06 Thread Timo Walther

Hi Feng,

this is indeed a good proposal.

1) It makes sense to improve the catalog listing for platform providers.

2) Other feedback from the past has shown that users would like to avoid 
the default in-memory catalog and offer their catalog before a 
TableEnvironment session starts.


3) Also we might reconsider whether a default catalog and default 
database make sense. Or whether this can be disabled and SHOW CATALOGS 
can be used for listing first without having a default catalog.


What do you think about option 2 and 3?

In any case, I would propose we pass a CatalogProvider to 
EnvironmentSettings and only allow a single instance. Catalogs should 
never shadow other catalogs.


We could also use the org.apache.flink.table.factories.Factory infra and 
allow catalog providers via pure string properties. Not sure if we need 
this in the first version though.


Cheers,
Timo


On 06.02.23 11:21, Feng Jin wrote:

Hi everyone,

The original discussion address is
https://issues.apache.org/jira/browse/FLINK-30126

Currently, Flink has access to many systems, including kafka, hive,
iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
might be:
kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
mysql_database2_

As the platform of the Flink SQL job, we need to maintain the meta
information of each system of the company, and when the Flink job
starts, we need to register the catalog with the Flink table
environment, so that users can use any table through the
env.executeSql interface.

When we only have a small number of catalogs, we can register like
this, but when there are thousands of catalogs, I think that there
needs to be a dynamic loading mechanism that we can register catalog
when needed, speed up the initialization of the table environment, and
avoid the useless catalog registration process.

Preliminary thoughts:

A new CatalogProvider interface can be added:
It contains two interfaces:
* listCatalogs() interface, which can list all the interfaces that the
interface can provide
* getCatalog() interface,  which can get a catalog instance by catalog name.

```java
public interface CatalogProvider {

 default void initialize(ClassLoader classLoader, ReadableConfig config) {}

 Optional getCatalog(String catalogName);

 Set listCatalogs();
}
```


The corresponding implementation in CatalogManager is as follows:

```java
public CatalogManager {
 private @Nullable CatalogProvider catalogProvider;

 private Map catalogs;

 public void setCatalogProvider(CatalogProvider catalogProvider) {
 this.catalogProvider = catalogProvider;
 }

 public Optional getCatalog(String catalogName) {
 // If there is no corresponding catalog in catalogs,
 // get catalog by catalogProvider
 if (catalogProvider != null) {
 Optional catalog = 
catalogProvider.getCatalog(catalogName);
 }
 }

}
```



Possible problems:

1. Catalog name conflict, how to choose when the registered catalog
and the catalog provided by catalog-provider conflict?
I prefer tableEnv-registered ones over catalogs provided by the
catalog-provider. If the user wishes to reference the catalog provided
by the catalog-provider, they can unregister the catalog in tableEnv
through the `unregisterCatalog` interface.

2. Number of CatalogProviders, is it possible to have multiple
catalogProvider implementations?
I don't have a good idea of this at the moment. If multiple
catalogProviders are supported, it brings much more convenience, But
there may be catalog name conflicts between different
catalogProviders.



Looking forward to your reply, any feedback is appreciated!


Best.

Feng Jin





Re: [DISCUSS] Hybrid Source Connector

2022-12-12 Thread Timo Walther

Hi Ran,

Thanks for proposing a FLIP. Btw according to the process, the subject 
of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so 
that people can identify this discussion as a FLIP discussion.


Supporting the hybrid source for SQL was a long-standing issue on our 
roadmap. Happy to give feedback here:


1) Options

Coming up with stable long-term options should be a shared effort. 
Having an index as a key could cause unintended side effects if the 
index is not correctly chosen, I would suggest we use IDs instead.


What do you think about the following structure?

CREATE TABLE ... WITH (
  'sources'='historical;realtime',   -- Config option of type string list
  'historical.connector' = 'filesystem',
  'historical.path' = '/tmp/a.csv',
  'historcal.format' = 'csv',
  'realtime.path' = '/tmp/b.csv',
  'realtime.format' = 'csv'"
)

I would limit the IDs to simple [a-z0-9_] identifiers. Once we support 
metadata columns, we can also propagate these IDs easily.


2) Schema field mappings

The FLIP mentions `schema-field-mappings` could you elaborate on this in 
the document?


3) Start position strategies

Have you thought about how we can represent start position strategies. 
The FLIP is very minimal but it would be nice to at least hear some 
opinions on this topic. Maybe we can come up with some general strategy 
that makes the most common use case possible in the near future.


Thanks,
Timo



On 08.12.22 10:26, Ran Tao wrote:

Hi guys. HybridSource is a good feature, but now released version did not
support table & sql api for a long time.

I notice that there is a related ticket here:
https://issues.apache.org/jira/browse/FLINK-22793
but the progress is slow, i wonder can we push forward this function.

I have wrote a discussed FLIP,  look forward to your comments.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225





Re: [DISCUSS] FLIP-275: Support Remote SQL Client Based on SQL Gateway

2022-12-12 Thread Timo Walther

Hi everyone,

sorry to jump into this discussion so late.

> So we decided to revert the RowFormat related changes and let the 
client to resolve the print format.


Could you elaborate a bit on this topic in the FLIP? I still believe 
that we need 2 types of output formats.


Format A: for the SQL Client CLI and other interactive notebooks that 
just uses SQL CAST(... AS STRING) semantics executed on the server side


Format B: for JDBC SDK or other machine-readable downstream libraries

Take a TIMESTAMP WITH LOCAL TIME ZONE as an example. The string 
representation depends on a session configuration option. Clients might 
not be aware of this session option, so the formatting must happen on 
the server side.


However, when the downstream consumer is a library, maybe the library 
would like to get the raw millis/nanos since epoch.


Also nested rows and collections might be better encoded with format B 
for libraries but interactive sessions are happy if nested types are 
already formatted server-side, so not every client needs custom code for 
the formatting.


Regards,
Timo



On 06.12.22 15:13, godfrey he wrote:

Hi, zeklin


The CLI will use default print style for the non-query result.

Please make sure the print results of EXPLAIN/DESC/SHOW CREATE TABLE
commands are clear.


We think it’s better to add the root cause to the ErrorResponseBody.

LGTM

Best,
Godfrey

yu zelin  于2022年12月6日周二 17:51写道:


Hi, Godfrey

Thanks for your feedback. Below is my thoughts about your questions.

1. About RowFormat.
I agree to your opinion. So we decided to revert the RowFormat related changes
and let the client to resolve the print format.

2. About ContentType
I agree that the definition of the ContentType is not clear. But how to define 
the
statement type is another big question. So, we decided to only tell the query 
result
and non-query result apart. The CLI will use default print style for the 
non-query
result.

3. About ErrorHandling
I think reuse the current ErrorResponseBody is good, but parse the root cause
from the exception stack strings is quite hacking. We think it’s better to add 
the
root cause to the ErrorResponseBody.

4. About Runtime REST API Modifications
I agree, too. This part is moved to the ‘Future Work’.

Best,
Yu Zelin



2022年12月5日 18:33,godfrey he  写道:

Hi Zelin,

Thanks for driving this discussion.

I have a few comments,


Add RowFormat to ResultSet to indicate the format of rows.

We should not require SqlGateway server to meet the display
requirements of a CliClient.
Because different CliClients may have different display style. The
server just need to response the data,
and the CliClient prints the result as needed. So RowFormat is not needed.


Add ContentType to ResultSet to indicate what kind of data the result contains.

from my first sight, the values of ContentType are intersected, such
as: A select query will return QUERY_RESULT,
but it also has JOB_ID. OTHER is too ambiguous, I don't know which
kind of query will return OTHER.
I recommend returning the concrete type for each statement, such as
"CREATE TABLE" for "create table xx (...) with ()",
"SELECT" for "select * from xxx". The statement type can be maintained
in `Operation`s.


Error Handling

I think current design of error handling mechanism can meet the
requirement of CliClient, we can get the root cause from
the stack (see ErrorResponseBody#errors). If it becomes a common
requirement (for many clients) in the future,
we can introduce this interface.


Runtime REST API Modification for Local Client Migration

I think this part is over-engineered, this part belongs to optimization.
The client does not require very high performance, the current design
can already meet our needs.
If we find performance problems in the future, do such optimizations.

Best,
Godfrey

yu zelin  于2022年12月5日周一 11:11写道:


Hi, Shammon

Thanks for your feedback. I think it’s good to support jdbc-sdk. However,
it's not supported in the gateway side yet. In my opinion, this FLIP is more
concerned with the SQL Client. How about put “supporting jdbc-sdk” in
‘Future Work’? We can discuss how to implement it in another thread.

Best,
Yu Zelin

2022年12月2日 18:12,Shammon FY  写道:

Hi zelin

Thanks for driving this discussion.

I notice that the sql-client will interact with sql-gateway by `REST
Client` in the `Executor` in the FLIP, how about introducing jdbc-sdk for
sql-gateway?

Then the sql-client can connect the gateway with jdbc-sdk, on the other
hand, the other applications and tools such as jmeter can use the jdbc-sdk
to connect sql-gateway too.

Best,
Shammon


On Fri, Dec 2, 2022 at 4:10 PM yu zelin  wrote:


Hi Jim,

Thanks for your feedback!


Should this configuration be mentioned in the FLIP?


Sure.


some way for the server to be able to limit the number of requests it

receives.
I’m sorry that this FLIP is dedicated in implementing the Remote mode, so
we
didn't consider much about this. I think the option is enough currently.
I will 

[jira] [Created] (FLINK-30226) Offer a Identity(De)SerializationSchema

2022-11-28 Thread Timo Walther (Jira)
Timo Walther created FLINK-30226:


 Summary: Offer a Identity(De)SerializationSchema
 Key: FLINK-30226
 URL: https://issues.apache.org/jira/browse/FLINK-30226
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Timo Walther


Sometimes it can be convenient if the API provides a 
"Identity(De)SerializationSchema" which simply takes and returns its input as 
{{byte[]}}. The Table API offers the {{raw}} format for this. DataStream API 
does not have equivalent functionality.

We could also name it `Raw(De)SerializationSchema` for consistency?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-shaded 16.1, release candidate #1

2022-11-23 Thread Timo Walther

+1 (binding)

Thanks,
Timo

On 23.11.22 15:53, Piotr Nowojski wrote:

+1 (binding)

czw., 10 lis 2022 o 11:38 Martijn Visser 
napisał(a):


+1 (binding)

- Validated hashes
- Verified signature
- Verified that no binaries exist in the source archive
- Build the source with Maven
- Verified licenses
- Verified web PR


On Tue, Nov 8, 2022 at 11:48 AM Chesnay Schepler 
wrote:


Hi everyone,
Please review and vote on the release candidate #1 for the version 16.1,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org
[2], which are signed with the key with fingerprint C2EED7B111D464BA [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-16.1-rc1" [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1]
https://repository.apache.org/content/repositories/orgapacheflink-1546/
[2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-16.1-rc1
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4]
https://repository.apache.org/content/repositories/orgapacheflink-1546/
[5] https://github.com/apache/flink-shaded/releases/tag/release-16.1-rc1
[6] https://github.com/apache/flink-web/pull/580









Re: [DISCUSS] FLINK-20578 Cannot create empty array using ARRAY[]

2022-10-28 Thread Timo Walther
Actually, the new type inference stack for UDFs is smart enough to solve 
this issue. It could derive a data type for the array from the 
surrounding call (expected data type).


So this can be supported with the right type inference logic: 
cast(ARRAY() as int)


Unfortunately, ARRAY is fully managed by Calcite and maybe deeply 
integrated also into the parser (at least this is the case for ROW).
TBH if I were to design a FLIP for the collection functions, I would 
actually propose to introduce `ARRAY_OF()`, `ROW_OF()` to have full 
control over the type inference in our stack. In our stack, this also 
means that NULL is unknown. Calcite distinguished between NULL and unknown.


So if we wanna go the easy path (without introducing ARRAY_OF), ARRAY() 
should result in ARRAY if the type can not be derived by the 
surrounding call.


Regards,
Timo

On 28.10.22 03:46, yuxia wrote:

For an empty array, seems different engine use different data type:
Hive: string
Spark: string ?
Trino:  Unknown
BigQuery: Integer

I have tried with Hive and Spark, but haven't tried with Trino and BigQuery.

I'm a little of doubt about the spark's behavior. But from my sides, seems 
Spark actually use string type which is different from your investigation.
I try with the following sql in spark-cli:
`
select array() + 1
`

The exception is
`
Error in query: cannot resolve '(array() + 1)' due to data type mismatch: differing 
types in '(array() + 1)' (array and int).; line 1 pos 7;
'Project [unresolvedalias((array() + 1), None)]
+- OneRowRelation
`


Seems it's hard to decide which data type Flink should use. I'm insterested in 
the reason why you would like to use Integer type.
I haven't cheked whether the sql stardard specifies it. But from my side, I 
prefer to follow Hive/Spark.

BTW: the query `SELECT COALESCE(1, cast(ARRAY() as int))` will fail in Hive and 
Spark.


Best regards,
Yuxia

- 原始邮件 -
发件人: "eric xiao" 
收件人: "dev" 
发送时间: 星期四, 2022年 10 月 27日 下午 9:13:51
主题: [DISCUSS] FLINK-20578 Cannot create empty array using ARRAY[]

Hi,

I would like to propose a solution to this JIRA issue. I looked at the
comments and there was some guidance around where in the code we should
update to allow for this behaviour. But I believe there are still two
questions that remain open:

1. Is this expected behaviour (i.e. users should not be able to create
an empty array)?
2. If this is indeed expected behaviour, what should the data type be of
the empty array?

I did some digging into other query engines / databases in hopes of
answering the following two questions - That can be found at the end of
this thread.

*Q: *Is this expected behaviour (i.e. users should not be able to create an
empty array)?
*A: *Yes I would say this is expected behaviour and something we should add
into the Flink SQL API.

*Q: *What should the data type be of the empty array?
*A: *This question is a bit harder to answer and I think it would require
two steps.

*Step 1: Pick a default data type to initialize the empty array.*

We can use an "empty data type" such as NULL, VOID.

*Step 2: Create or reuse type coercion to make using empty arrays easier.*

The above should unblock users from creating empty arrays, but if one would
use an empty array in an COALESCE operation.

i.e. SELECT COALESCE(int_column, ARRAY[])

I believe they will get a query issue where the type for int_column (INTEGER)
and the empty array (NULL, VOID) do not match. Thus a user will need to
cast the empty array:

i.e. SELECT COALESCE(int_column, CAST(ARRAY[] AS INT))

as such to have the COALESCE query to execute successfully.

-
*Trino*

EXPLAIN SELECT ARRAY[]

Fragment 0 [SINGLE]
 Output layout: [expr]
 Output partitioning: SINGLE []
 Output[columnNames = [_col0]]
 │   Layout: [expr:array(unknown)]
 │   Estimates: {rows: 1 (55B), cpu: 0, memory: 0B, network: 0B}
 │   _col0 := expr
 └─ Values[]
Layout: [expr:array(unknown)]
Estimates: {rows: 1 (55B), cpu: 0, memory: 0B, network: 0B}

  ("$literal$"(from_base64('AwAAAFJMRQAKQllURV9BUlJBWQEBgAA=')))

Expected behaviour? *Yes.*
Array data type? *Unknown.*

*Spark*

sc.sql.sql("SELECT ARRAY[]").explain()

  DataFrame[array(): array]

Expected behaviour? *Yes.*
Array data type? *Void.*

*BigQuery*

SELECT ARRAY[]

Field name Type  Mode
f0_  INTEGER  REPEATED

Expected behaviour? *Yes.*
Array data type? *Integer.*

Best,

Eric





Re: [DISCUSS] Drop TypeSerializerConfigSnapshot and savepoint support from Flink versions < 1.8.0

2022-10-21 Thread Timo Walther
Makes sense to me. The serializer stack is pretty complex right now, the 
more legacy we remove the better.


Regards,
Timo


On 20.10.22 12:49, Chesnay Schepler wrote:

+1

Sounds like a good reason to drop these long-deprecated APIs.

On 19/10/2022 15:13, Piotr Nowojski wrote:

Hi devs,

I would like to open a discussion to remove the long deprecated
(@PublicEvolving) TypeSerializerConfigSnapshot class [1] and the related
code.

The motivation behind this move is two fold. One reason is that it
complicates our code base unnecessarily and creates confusion on how to
actually implement custom serializers. The immediate reason is that I
wanted to clean up Flink's configuration stack a bit and refactor the
ExecutionConfig class [2]. This refactor would keep the API compatibility
of the ExecutionConfig, but it would break savepoint compatibility with
snapshots written with some of the old serializers, which had
ExecutionConfig as a field and were serialized in the snapshot. This 
issue

has been resolved by the introduction of TypeSerializerSnapshot in Flink
1.7 [3], where serializers are no longer part of the snapshot.

TypeSerializerConfigSnapshot has been deprecated and no longer used by
built-in serializers since Flink 1.8 [4] and [5]. Users were 
encouraged to

migrate to TypeSerializerSnapshot since then with their own custom
serializers. That has been plenty of time for the migration.

This proposal would have the following impact for the users:
1. we would drop support for recovery from savepoints taken with Flink <
1.7.0 for all built in types serializers
2. we would drop support for recovery from savepoints taken with Flink <
1.8.0 for built in kryo serializers
3. we would drop support for recovery from savepoints taken with Flink <
1.17 for custom serializers using deprecated TypeSerializerConfigSnapshot

1. and 2. would have a simple migration path. Users migrating from those
old savepoints would have to first start his job using a Flink version 
from

the [1.8, 1.16] range, and take a new savepoint that would be compatible
with Flink 1.17.
3. This is a bit more problematic, because users would have to first
migrate their own custom serializers to use TypeSerializerSnapshot 
(using a

Flink version from the [1.8, 1.16]), take a savepoint, and only then
migrate to Flink 1.17. However users had already 4 years to migrate, 
which

in my opinion has been plenty of time to do so.

As a side effect, we could also drop support for some of the legacy
metadata serializers from LegacyStateMetaInfoReaders and potentially 
other

places that we are keeping for the sake of compatibility with old
savepoints.

What do you think?

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.html
[2] https://issues.apache.org/jira/browse/FLINK-29379
[3] https://issues.apache.org/jira/browse/FLINK-9377
[4] https://issues.apache.org/jira/browse/FLINK-9376
[5] https://issues.apache.org/jira/browse/FLINK-11323







Re: [VOTE] FLIP-260: Expose Finish Method For TableFunction

2022-09-22 Thread Timo Walther

+1 (binding)

Please make also sure to update ExpressionReducer during implementation. 
This is not mentioned in the FLIP yet.


Thanks,
Timo

On 22.09.22 10:41, Piotr Nowojski wrote:

+1 (binding)

czw., 22 wrz 2022 o 08:21 Lincoln Lee  napisał(a):


Hi everyone,

Thanks for all your feedback for FLIP-260[1] in the discussion thread[2],
I'd
like to start a vote for it.

The vote will be open for at least 72 hours.

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
[2] https://lists.apache.org/thread/m9hj60p3mntyctkbxrksm8l4d0s4q9dw

Best,
Lincoln Lee







[jira] [Created] (FLINK-29379) Back ExecutionConfig and CheckpointConfig by Configuration

2022-09-21 Thread Timo Walther (Jira)
Timo Walther created FLINK-29379:


 Summary: Back ExecutionConfig and CheckpointConfig by Configuration
 Key: FLINK-29379
 URL: https://issues.apache.org/jira/browse/FLINK-29379
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Timo Walther


Not sure if this is a duplicate, but as this issue pops up over and over again, 
it might be time to discuss it here and fix it.

Currently, configuration is spread across instances of {{Configuration}} and 
POJOs (e.g. {{ExecutionConfig}} or {{CheckpointConfig}}). This makes is very 
tricky to handle configuration throughout the stack. The practice has shown 
that configuration might be passed, layered, merged, restricted, copied, 
filtered, etc. This is easy with the config option stack but very tricky with 
the existing POJOs.

Many locations reveal the current shortcoming. For example, 
{{org.apache.flink.table.planner.delegation.DefaultExecutor}} has a 
{{isCheckpointingEnabled()}} method simply because we cannot trust the 
{{Configuration}} object that is passed around. Same for checking if object 
reuse is enabled.

A solution is still up for discussion. Ideally, we deprecate 
{{ExecutionConfig}} and {{CheckpointConfig}} and advocate a pure config option 
based approach. Alternatively, we could do a hybrid approach similar to 
`TableConfig` (that is backed by config options but has setters for 
convenience). The latter approach would cause less deprecations in the API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29309) Relax allow-client-job-configurations for Table API and parameters

2022-09-15 Thread Timo Walther (Jira)
Timo Walther created FLINK-29309:


 Summary: Relax allow-client-job-configurations for Table API and 
parameters
 Key: FLINK-29309
 URL: https://issues.apache.org/jira/browse/FLINK-29309
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Timo Walther
Assignee: Timo Walther


Currently, the {{execution.allow-client-job-configurations}} is a bit too 
strict. Due to the complexity of the configuration stack, it makes it 
impossible to use Table API and also prevents very common parameters like 
{{pipeline.global-job-parameters}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29267) Support external type systems in DDL

2022-09-12 Thread Timo Walther (Jira)
Timo Walther created FLINK-29267:


 Summary: Support external type systems in DDL
 Key: FLINK-29267
 URL: https://issues.apache.org/jira/browse/FLINK-29267
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile), Table SQL / Ecosystem
Reporter: Timo Walther
Assignee: Timo Walther


Many connectors and formats require supporting external data types. Postgres 
users request UUID support, Avro users require enum support, etc.

FLINK-19869 implemented support for Postgres UUIDs poorly and event impacts 
pipelines with regular strings.

The long-term solution should be user-defined types in Flink. This is however a 
bigger effort that requires a FLIP and a bigger amount of resources.

As a mid-term solution, we should offer a consistent approach based on DDL 
options that allows to define a mapping from Flink type system to the external 
type system. I suggest the following:

{code}
CREATE TABLE MyTable (
...
) WITH(
  'mapping.data-types' = ': '
)
{code}

The mapping defines a map from Flink data type to external data type. The 
external data type should be string parsable. This works for most connectors 
and formats (e.g. Avro schema string).


Examples:

{code}
CREATE TABLE MyTable (
  regular_col STRING,
  uuid_col STRING,
  point_col ARRAY,
  box_col ARRAY>
) WITH(
  'mapping.data-types' = 'uuid_col: uuid, point_col: point, box_col: box'
)
{code}

We provide a table of supported mapping data types. E.g. the {{point}} type is 
always maped to {{ARRAY}}. In general we choose a data type in Flink 
that comes closest to the required functionality.


Future work:

In theory, we can also offer mapping of field names. It might be a requirement 
that Flink's column name is different from the external system's one. 

{code}
CREATE TABLE MyTable (
...
) WITH(
  'mapping.names' = ': '
)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[ANNOUNCE] New Apache Flink PMC Member - Martijn Visser

2022-09-09 Thread Timo Walther

Hi everyone,

I'm very happy to announce that Martijn Visser has joined the Flink PMC!

Martijn has helped the community in many different ways over the past 
months. Externalizing the connectors from the Flink repo to their own 
repository, continously updating dependencies, and performing other 
project-wide refactorings. He is constantly coordinating contributions, 
connecting stakeholders, finding committers for contributions, driving 
release syncs, and helping in making the ASF a better place (e.g. by 
using Matomo an ASF-compliant tracking solution for all projects).


Congratulations and welcome, Martijn!

Cheers,
Timo Walther
(On behalf of the Apache Flink PMC)


Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

2022-08-26 Thread Timo Walther

Hi Ran,

so if I understand it correctly, the problem here is not only backward 
compatibility but also forward compatibility. You might run different 
versions of your connector some of them offer a metadata key A and some 
don't offer it yet. But the DDL should work for both connector 
implementations, right?


What I could imagine here is that we implement the DEFAULT constraint. A 
DDL could then look like this:


CREATE TABLE x (
  col_s0 STRING METADATA DEFAULT NULL,
  col_s1 STRING METADATA DEFAULT "unknown",
  col_s2 STRING DEFAULT "unknown",
)

A first version could only support metadata columns. But it would be 
more consistent to implement it for all columns types from the very 
beginning. col_s0 is a special case.


What do you think?

Regards,
Timo


On 26.08.22 10:28, Jark Wu wrote:

Hi Ran,

If the metadata is from the message properties, then you can manually cast it 
to your preferred types,
such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS TIMESTAMP)`.

If the metadata is not from the message properties, how does the connector know 
which field to convert from?
Shouldn’t the connector be modified to support this new metadata column?

Best,
Jark




2022年8月26日 15:30,Ran Tao  写道:

Hi, TiMo. I think using one map column in the debezium format you
illustrated above can't cover the discussed scenario.
It's not the same thing.

Here is a debezium format example from flink docs: [1]

```
CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  origin_properties MAP METADATA FROM
'value.source.properties' VIRTUAL,
  user_id BIGINT,
) WITH (
  'connector' = 'kafka',
  'value.format' = 'debezium-json'
  ...
);
```

*the `origin_properties` is a column used for properties. So we define it
at MAP *(just like you respond). But the other metadata columns have their
own data types.
e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
within one MAP column. it's not a good idea.

My suggestion is that if kafka above *add some new metadatas*(just for
example, kafka maybe stable, but a certain connector or middleware might be
developing, so its metadatas could be added or changed)
e.g. at some time, kafka added a `host_name` metadata (indicate the address
of message broker).

We can define sql like this:
```
CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  host_name STRING METADATA VIRTUAL DYNAMIC,
  origin_properties MAP METADATA FROM
'value.source.properties' VIRTUAL,
  user_id BIGINT,
) WITH (
  'connector' = 'kafka',
  'value.format' = 'debezium-json'
  ...
);
```
Then users can use `host_name` this metadata, because it's a DYNAMIC
metacolumn, flink dont't throw exception although `host_name`
not belongs to kafka before, and the developers don't need to modify or
rebuild flink source code and publish flink to online environment (it comes
at a high cost).

Considering the return value:
kafka before (no this metadata): null
kafka now (added this metadata already): the concrete value

Same user sql works well in the past and now even in the future rather than
check and deny these new metadata columns or modify connector
implementation frequently to support it.
And it's an option to configure by using 'DYNAMIC' at the metadata
column(or other better implementations).

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/

Timo Walther  于2022年8月25日周四 21:07写道:


Hi Ran,

what would be the data type of this dynamic metadata column? The planner
and many parts of the stack will require a data type.

Personally, I feel connector developers can already have the same
functionality by declaring a metadata column as `MAP`.
This is what we expose already as `debezium.source.properties`. Whatever
Debezium adds will be available through this property and can be
accessed via `SELECT col['my-new-property'] FROM x` including being NULL
be default if not present.

Regards,
Timo


On 25.08.22 14:04, Ran Tao wrote:

```
create table test_source(
  __test_metadata__ varchar METADATA,
  f0 varchar,
  f1 varchar,
  f2 bigint,
  ts as CURRENT_TIMESTAMP
) with(
  'connector'='test',
   ...
)
```

If we not pre define `__test_metadata__` as meta keys by implementing
listReadableMetadata, run the above sql, it will cause exception like

this:


org.apache.flink.table.api.ValidationException: Invalid metadata key
'__test_metadata__' in column '__test_metadata__' of table
'default_catalog.default_database.test_source'. The DynamicTableSource
class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
supports the following metadata keys for reading:
xxx, yyy

at


org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)


Because the current flink metadata column must exist in results returned

by

`listReadableMetadata`.  But when a certain

Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

2022-08-25 Thread Timo Walther

Hi Ran,

what would be the data type of this dynamic metadata column? The planner 
and many parts of the stack will require a data type.


Personally, I feel connector developers can already have the same 
functionality by declaring a metadata column as `MAP`. 
This is what we expose already as `debezium.source.properties`. Whatever 
Debezium adds will be available through this property and can be 
accessed via `SELECT col['my-new-property'] FROM x` including being NULL 
be default if not present.


Regards,
Timo


On 25.08.22 14:04, Ran Tao wrote:

```
create table test_source(
  __test_metadata__ varchar METADATA,
  f0 varchar,
  f1 varchar,
  f2 bigint,
  ts as CURRENT_TIMESTAMP
) with(
  'connector'='test',
   ...
)
```

If we not pre define `__test_metadata__` as meta keys by implementing
listReadableMetadata, run the above sql, it will cause exception like this:

org.apache.flink.table.api.ValidationException: Invalid metadata key
'__test_metadata__' in column '__test_metadata__' of table
'default_catalog.default_database.test_source'. The DynamicTableSource
class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
supports the following metadata keys for reading:
xxx, yyy

at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)

Because the current flink metadata column must exist in results returned by
`listReadableMetadata`.  But when a certain connector adds some metadatas,
we can not use it directly unless we modify this connector code and support
it. In some situations, It can be intolerable. Can we support 'DYNAMIC
MetadataColumn'?  Its basic mechanism is not to check a column with
existing metadatas and users can define it dynamically. If a certain
connector without this metadata, the column value will return null
otherwise return it's concrete value. It has great benefits in some
scenarios.

Looking forward to your opinions.






[jira] [Created] (FLINK-29035) ExpressionReducer does not work with jar resources

2022-08-18 Thread Timo Walther (Jira)
Timo Walther created FLINK-29035:


 Summary: ExpressionReducer does not work with jar resources
 Key: FLINK-29035
 URL: https://issues.apache.org/jira/browse/FLINK-29035
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


It seems the code generation for expression reduction uses an invalid class 
loader that does not contain the jar resource.

Reproducible example:

{code}

CREATE TEMPORARY SYSTEM FUNCTION myLower AS '%s' USING JAR '%s'

SELECT myLower('HELLO')

{code}

 

fails with

{code}

java.lang.RuntimeException: Could not instantiate generated class 
'ExpressionReducer$4'

    at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
    at 
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:97)
    at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:759)
    at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:699)
    at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:306)


Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.

Caused by: org.codehaus.commons.compiler.CompileException: Line 13, Column 37: 
Cannot determine simple type name "LowerUDF46"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >