Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2022-11-30 Thread Martijn Visser
Hi all,

A couple of first comments on this:
1. I'm missing the problem statement in the overall introduction. It
immediately goes into proposal mode, I would like to first read what is the
actual problem, before diving into solutions.
2. "Each ETL job creates snapshots with checkpoint info on sink tables in
Table Store"  -> That reads like you're proposing that snapshots need to be
written to Table Store?
3. If you introduce a MetaService, it becomes the single point of failure
because it coordinates everything. But I can't find anything in the FLIP on
making the MetaService high available or how to deal with failovers there.
4. The FLIP states under Rejected Alternatives "Currently watermark in
Flink cannot align data." which is not true, given that there is FLIP-182
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

5. Given the MetaService role, it feels like this is introducing a tight
dependency between Flink and the Table Store. How pluggable is this
solution, given the changes that need to be made to Flink in order to
support this?

Best regards,

Martijn


On Thu, Dec 1, 2022 at 4:49 AM Shammon FY  wrote:

> Hi devs:
>
> I'd like to start a discussion about FLIP-276: Data Consistency of
> Streaming and Batch ETL in Flink and Table Store[1]. In the whole data
> stream processing, there are consistency problems such as how to manage the
> dependencies of multiple jobs and tables, how to define and handle E2E
> delays, and how to ensure the data consistency of queries on flowing data?
> This FLIP aims to support data consistency and answer these questions.
>
> I'v discussed the details of this FLIP with @Jingsong Lee and @libenchao
> offline several times. We hope to support data consistency of queries on
> tables, managing relationships between Flink jobs and tables and revising
> tables on streaming in Flink and Table Store to improve the whole data
> stream processing.
>
> Looking forward to your feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
>
>
> Best,
> Shammon
>


Re: [VOTE] FLIP-273: Improve Catalog API to Support ALTER TABLE syntax

2022-11-30 Thread Jingsong Li
+1 (binding)

Thanks Shengkai for driving this FLIP.

Best,
Jingsong

On Thu, Dec 1, 2022 at 12:20 PM Shengkai Fang  wrote:
>
> Hi All,
>
> Thanks for all the feedback so far. Based on the discussion[1] we seem
> to have a consensus, so I would like to start a vote on FLIP-273.
>
> The vote will last for at least 72 hours (Dec 5th at 13:00 GMT,
> excluding weekend days) unless there is an objection or insufficient votes.
>
> Best,
> Shengkai
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-273%3A+Improve+the+Catalog+API+to+Support+ALTER+TABLE+syntax
> [2] https://lists.apache.org/thread/2v4kh2bpzvk049zdxb687q7o1pcmnnnw


[jira] [Created] (FLINK-30256) LogalWindowAgg can set the chaining Strategy to always

2022-11-30 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-30256:
-

 Summary: LogalWindowAgg can set the chaining Strategy to always
 Key: FLINK-30256
 URL: https://issues.apache.org/jira/browse/FLINK-30256
 Project: Flink
  Issue Type: Improvement
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-273: Improve Catalog API to Support ALTER TABLE syntax

2022-11-30 Thread Shengkai Fang
Hi All,

Thanks for all the feedback so far. Based on the discussion[1] we seem
to have a consensus, so I would like to start a vote on FLIP-273.

The vote will last for at least 72 hours (Dec 5th at 13:00 GMT,
excluding weekend days) unless there is an objection or insufficient votes.

Best,
Shengkai

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-273%3A+Improve+the+Catalog+API+to+Support+ALTER+TABLE+syntax
[2] https://lists.apache.org/thread/2v4kh2bpzvk049zdxb687q7o1pcmnnnw


[DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2022-11-30 Thread Shammon FY
Hi devs:

I'd like to start a discussion about FLIP-276: Data Consistency of
Streaming and Batch ETL in Flink and Table Store[1]. In the whole data
stream processing, there are consistency problems such as how to manage the
dependencies of multiple jobs and tables, how to define and handle E2E
delays, and how to ensure the data consistency of queries on flowing data?
This FLIP aims to support data consistency and answer these questions.

I'v discussed the details of this FLIP with @Jingsong Lee and @libenchao
offline several times. We hope to support data consistency of queries on
tables, managing relationships between Flink jobs and tables and revising
tables on streaming in Flink and Table Store to improve the whole data
stream processing.

Looking forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store


Best,
Shammon


[jira] [Created] (FLINK-30255) Throw exception for upper case fields is used in hive metastore

2022-11-30 Thread Shammon (Jira)
Shammon created FLINK-30255:
---

 Summary: Throw exception for upper case fields is used in hive 
metastore
 Key: FLINK-30255
 URL: https://issues.apache.org/jira/browse/FLINK-30255
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.3.0
Reporter: Shammon


Currently there will be incompatible when user use upper case in hive metastore 
and table store, we should throw exception for it and find a more elegant 
compatibility mode later



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] OLM Bundles for Flink Kubernetes Operator

2022-11-30 Thread Hao t Chang
Hi Gyula,

This is a good model. I will prepare a PR. Jim and I will help the verification 
and release process.

--
Best,
Ted Chang | Software Engineer | htch...@us.ibm.com



Re: [DISCUSS] OLM Bundles for Flink Kubernetes Operator

2022-11-30 Thread Gyula Fóra
Hi All!

I think for this first version let's go with the following simple model:

1. We include the OLM bundle generation script in the operator git repo
before the release
2. We add one extra release verification step to the wiki with detailed
instructions on generating and verifying the bundle
3. Once the release is done, we generate the bundle from the release helm
chart and publish it.

This is a relatively lightweight process with minimal burden on the release
managers and hopefully Ted can help out with the verification and release
process!

What do you think?

If there are no major objections, it would be great to have a PR this week
to get it in before the feature freeze.

Gyula

On Tue, Nov 29, 2022 at 3:37 PM Őrhidi Mátyás 
wrote:

> Hi folks,
>
> I'm going to shepherd the upcoming 1.3.0 release. I'll also try to find
> some time to review the OLM integration proposal, and see how much extra
> effort it would be.
>
> Best,
> Matyas
>
> On Wed, Nov 23, 2022 at 5:45 PM Yang Wang  wrote:
>
> > Improving the visibility of Flink Kubernetes Operator is great. And I
> agree
> > OLM could help with this.
> >
> > I just hope this will not make the whole release process too complicated.
> > Of course, if we want to integrate the OLM into the official release, it
> > should be tested by the users easily.
> >
> > Best,
> > Yang
> >
> > Gyula Fóra  于2022年11月24日周四 00:29写道:
> >
> > > Ted, Jim:
> > >
> > > When we create the RC bundle (jars, sources, helm chart) we execute the
> > > following steps:
> > >  1. Push the RC tag to git -> this will generate and publish an image
> > with
> > > the RC git commit tag to ghcr.io
> > >  2. We bake into the helm chart the RC tag as the image tag
> > >  3. We create the source and helm bundle, then publish it
> > >
> > > In step 3 we also need to add the OLM bundle creation and we can bake
> in
> > > the same ghcr.io image tag.
> > >
> > > Gyula
> > >
> > > On Wed, Nov 23, 2022 at 7:13 AM Jim Busche  wrote:
> > >
> > > > I'm curious how the RC automation works now - is it fully automatic?
> > For
> > > > example, a RC Debian image gets created, something like:
> > > > ghcr.io/apache/flink-kubernetes-operator:95128bf<
> > > > http://ghcr.io/apache/flink-kubernetes-operator:95128bf> and is
> pushed
> > > to
> > > > ghcr.io … then that's used in the rc helm chart?
> > > >
> > > > If that's all automated, then that rc candidate operator image value
> > must
> > > > be stored as a variable, and could be utilized to build the OLM
> bundle
> > as
> > > > well with the same rc operator image.  Then the bundle and catalog
> > could
> > > be
> > > > pushed to ghcr.io for testing.
> > > >
> > > >
> > > >
> > > > If it's not automated, then in the manual steps, there could a few
> > steps
> > > > added to set the rc operator image value prior to running the bundle
> > > > creation, then manually pushing the bundle and catalog to ghcr.io
> for
> > > > testing.
> > > >
> > > >
> > > > Thanks, Jim
> > > > --
> > > > James Busche | Sr. Software Engineer, Watson AI and Data Open
> > Technology
> > > |
> > > > 408-460-0737 | jbus...@us.ibm.com
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > From: Hao t Chang 
> > > > Date: Tuesday, November 22, 2022 at 2:55 PM
> > > > To: dev@flink.apache.org 
> > > > Subject: [EXTERNAL] [DISCUSS] OLM Bundles for Flink Kubernetes
> Operator
> > > > Hi Gyula,
> > > >
> > > > Agree, we should include the bundle file and let community inspect
> them
> > > in
> > > > the staging repo. In addition, people can do a few things to test the
> > > > bundle files.
> > > > 1.Run CI test suits (
> > > >
> https://github.com/tedhtchang/olm#run-ci-test-suits-before-creating-pr
> > )
> > > > with the bundle files directly.
> > > > 2.Deploy operator with OLM (requires the bundle image in
> a
> > > > registry)
> > > > 3.Test operators upgrade from the previous version with
> > > > OLM(requires both bundle and catalog image in a registry)
> > > >
> > > > For 2 and 3, it’s better to build a bundle and catalog images as part
> > of
> > > > the staging. For example, during the next release(1.3.0-rc1),
> > temporally
> > > > push the 2 images to
> > > > ghcr.io/apache/flink-kubernetes-operator-bundle:1.3.0-rc1 and
> > > > ghcr.io/apache/flink-kubernetes-opeator-catalog:1.3.0-rc1. Then,
> > > > community can test 2. and 3. easily with the following commands:
> > > > # Deploy the catalog src in default ns
> > > > cat < > > > apiVersion: operators.coreos.com/v1alpha1
> > > > kind: CatalogSource
> > > > metadata:
> > > >   name: olm-flink-operator-catalog
> > > >   namespace: default
> > > > spec:
> > > >   sourceType: grpc
> > > >   image: ghcr.io/apache/flink-kubernetes-opeator-catalog:1.3.0-rc1
> > > > EOF
> > > >
> > > > # Deploy operator from the catalog
> > > > Cat < > > > apiVersion: operators.coreos.com/v1alpha2
> > > > kind: OperatorGroup
> > > > metadata:
> > > >   name: default-og
> > 

Re: [DISCUSS] FLIP-275: Support Remote SQL Client Based on SQL Gateway

2022-11-30 Thread Jim Hughes
Hi Yu,

Thanks for moving my comments to this thread!  Also, thank you for
answering my questions; it is helping me understand the SQL Gateway
better.

5.
> Our idea is to introduce a new session option (like
'sql-client.result.fetch-interval') to control
the fetching requests sending frequency. What do you think?

Should this configuration be mentioned in the FLIP?

One slight concern I have with having 'sql-client.result.fetch-interval' as
a session configuration is that users could set it low and cause the client
to send a large volume of requests to the SQL gateway.

Generally, I'd like to see some way for the server to be able to limit the
number of requests it receives.  If that really needs to be done by a proxy
in front of the SQL gateway, that is fine as well.  (To be clear, I don't
think my concern here should be blocking in any way.)

7.
> What is the serialization lifecycle for results?

I wonder if two other options are possible:
3) Could the Gateway just forward the result byte array?  (Or does the
Gateway need to deserialize the response in order to understand it for some
reason?)
4) Could the JobManager prepare the results in JSON?  (Or similarly could
the Client read the format which the JobManager sends?)

Thanks again!

Cheers,

Jim

On Wed, Nov 30, 2022 at 9:40 AM yu zelin  wrote:

> Hi, all
>
> Thanks Jim’s questions below. Here I’d like to reply to them.
>
> >   1. For the Client Parser, is it going to work with the extended syntax
> >   from the Flink Table Store?
> >
> >   2. Relatedly, what will happen if an older Client tries to handle
> syntax
> >   that a newer service supports?  (Suppose I use a 1.17 client with a
> 1.18
> >   Gateway/system which has a new keyword.  Is there anything we should be
> >   designing for upfront?)
> >
> >   3. How will client and server version mismatches be handled?  Will a
> >   single gateway be able to support multiple endpoint versions?
> >   4. How are commands which change a session handled?  Are those sent via
> >   an ExecuteStatementRequest?
> >
> >   5. The remote POC uses polling for getting back status and getting back
> >   results.  Would it be possible to switch to web sockets or some other
> >   mechanism to avoid polling?  If polling is used for both, the polling
> >   frequency should be different between local and remote configurations.
> >
> >   6. What does this sentence mean?  "The reason why we didn't get the sql
> >   type in client side is because it's hard for the lightweight
> client-level
> >   parser to recognize some sql type  sql, such as query with CTE.  "
> >
> >   7. What is the serialization lifecycle for results?  It makes sense to
> >   have some control over whether the gateway returns results as SQL or
> JSON.
> >   I'd love to see a way to avoid needing to serialize and deserialize
> results
> >   on the SQL Gateway if possible.  I'm still new enough to the project
> that
> >   I'm not sure if that's readily possible.  Maybe the SQL Gateway's
> return
> >   type can be sent as part of the request so that the JobManager can send
> >   back results in an advantageous format?
> >
> >   8. Does ErrorType need to be marked as @PublicEvolving?
> >
> > I'm excited for the SQL client to support gateway mode!  Given the change
> > in design, do you think it'll still be part of the Flink 1.17 release?
>
> 1.  ClientParser can work with new (and unknown) SQL syntax. It is because
> if the
> sql type is not recognized, the sql will be submitted to the gateway
> directly.
>
> For more information: Actually, the proposed ClientParser only do two
> things:
> (1) Tell client commands (help, clear, etc) and sqls apart.
> (2) parses several sql types (e.g. SHOW CREATE statement, we can print raw
> string
> for the SHOW CREATE result instead of table). Here the recognization of
> sql types
> mostly affects the print style, and unrecognized sql also can be submitted
> to cluster.
> So the Client with new ClientParser can work compatible with new syntax.
>
> 2. First, I'd like to explain that the gateway APIs and supported syntax
> is two things.
> For example, ‘configureSession' and 'completeStatement' are APIs. As
> mentioned
> in #1, the sql statements which syntax is unknown will be submitted to the
> gateway,
> and whether they can be executed normally depends on whether the execution
> environment supports the syntax.
>
> > Is there anything we should be designing for upfront?
>
> The 'SqlGatewayRestAPIVersion’ has been introduced. But it is for sql
> gateway APIs.
>
> 3.
> > How will client and server version mismatches be handled?
>
> A lower version client can work compatible with a higher version gateway
> because the
> old interfaces won’t be deleted. When a higher version client connects to
> a lower version
> gateway, the client should notify the users if they try to use unsupported
> features. For
> example, the client start option ‘-i’  means using initialization file to
> initialize the session.
> We plan to use 

[jira] [Created] (FLINK-30254) Sync Pulsar updates to external Pulsar connector repository

2022-11-30 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30254:
--

 Summary: Sync Pulsar updates to external Pulsar connector 
repository
 Key: FLINK-30254
 URL: https://issues.apache.org/jira/browse/FLINK-30254
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Martijn Visser
Assignee: Martijn Visser


Currently the external Pulsar repository contains the code from the 
{release-1.16} branch. This should be synced with the changes that are merged 
into {master} since. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: SQL Gateway and SQL Client

2022-11-30 Thread yu zelin
Hi, Jim,

Thanks for your feedback, your suggestions are very good. I have replied in the 
discussion
mail, please take a look.

> 2022年11月29日 03:18,Jim Hughes  写道:
> 
> Hi Shengkai, Yu,
> 
> Thanks for the FLIP!  I have had a chance to read it, and it looks good.  I
> do have some questions:
> 
> I do like the idea of unifying the approaches so that the code doesn't get
> out of step.
> 
>   1. For the Client Parser, is it going to work with the extended syntax
>   from the Flink Table Store?
> 
>   2. Relatedly, what will happen if an older Client tries to handle syntax
>   that a newer service supports?  (Suppose I use a 1.17 client with a 1.18
>   Gateway/system which has a new keyword.  Is there anything we should be
>   designing for upfront?)
> 
>   3. How will client and server version mismatches be handled?  Will a
>   single gateway be able to support multiple endpoint versions?
>   4. How are commands which change a session handled?  Are those sent via
>   an ExecuteStatementRequest?
> 
>   5. The remote POC uses polling for getting back status and getting back
>   results.  Would it be possible to switch to web sockets or some other
>   mechanism to avoid polling?  If polling is used for both, the polling
>   frequency should be different between local and remote configurations.
> 
>   6. What does this sentence mean?  "The reason why we didn't get the sql
>   type in client side is because it's hard for the lightweight client-level
>   parser to recognize some sql type  sql, such as query with CTE.  "
> 
>   7. What is the serialization lifecycle for results?  It makes sense to
>   have some control over whether the gateway returns results as SQL or JSON.
>   I'd love to see a way to avoid needing to serialize and deserialize results
>   on the SQL Gateway if possible.  I'm still new enough to the project that
>   I'm not sure if that's readily possible.  Maybe the SQL Gateway's return
>   type can be sent as part of the request so that the JobManager can send
>   back results in an advantageous format?
> 
>   8. Does ErrorType need to be marked as @PublicEvolving?
> 
> I'm excited for the SQL client to support gateway mode!  Given the change
> in design, do you think it'll still be part of the Flink 1.17 release?
> 
> Cheers,
> 
> Jim
> 
> On Sun, Nov 27, 2022 at 8:54 PM Shengkai Fang  wrote:
> 
>> Hi, Jim and Alexey.
>> 
>> We have written the proposal[1]. It would be appreciated if you can give us
>> some feedback.
>> 
>> Best,
>> Shengkai
>> 
>> [1]
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-275%3A+Support+Remote+SQL+Client+Based+on+SQL+Gateway
>> 
>> yu zelin  于2022年11月24日周四 12:06写道:
>> 
>>> Hi Jim,
>>> Sorry for incorrect message in last reply.
 Shengkai will help to your PR
>>> I mean Shengkai will help to review the PRs. And I will add some of your
>>> suggestion to my design.
>>> 
>>> I think the POC code will be cherry-picked to my new design of SQL
>> Client.
 2022年11月23日 12:18,yu zelin  写道:
 
 Hi Jim,
 Sorry for late response. Just another busy week :)
 Last week, I’ve discussed within my team about my design. My teammates
>>> think it’s better to unify the local and remote mode, so I’ve
>> investigated
>>> and redesigned a new plan. I’ll inform you after the rewriting of FLIP
>>> finished (will be soon) and Shengkai will help to your PR.
 
 Best,
 
 Yu Zelin
 
> 2022年11月22日 02:59,Jim Hughes  写道:
> 
> Hi Yu, Shengkai,
> 
> As a quick update, I've had a chance to try out Yu's POC and it is
>>> working
> for me.  (Admittedly, I haven't tried it too extensively; I only tried
> basic operations.)
> 
> From my experiments, I did leave a few comments on
> https://github.com/apache/flink/pull/20958.
> 
> Overall, the PRs I see look pretty good.  Are they going to be merged
> soon?  Anything else I can do to help?
> 
> Cheers,
> 
> Jim
 
>>> 
>>> 
>> 



[RESULT][VOTE] FLIP-271: Autoscaling

2022-11-30 Thread Maximilian Michels
 The vote has been closed. Thank you for your votes. Here is the result:

Binding votes (9 in total):

- Gyula Fóra
- Őrhidi Mátyás
- Yang Wang
- Rui Fan
- Dong Lin
- Thomas Weise
- Marton Balassi
- Yun Tang
- Maximilian Michels

Non-binding votes (6 in total):

- Rui Fan
- Zheng Yu Chen
- Jiangang Liu
- Chenya Zhang
- Ferenc Csaky
- Mason Chen

According to the Flink Bylaws [1], consensus + three committer votes are
required to accept a FLIP. This requirement has been reached via 9 binding
votes (PMC + committer).

Thanks,
Max

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Actions

On Wed, Nov 30, 2022 at 2:24 PM Yun Tang  wrote:

> + 1 (binding)
>
> Best
> Yun Tang
> 
> From: Mason Chen 
> Sent: Wednesday, November 30, 2022 12:52
> To: dev@flink.apache.org 
> Subject: Re: [VOTE] FLIP-271: Autoscaling
>
> +1 (non-binding)
>
> On Tue, Nov 29, 2022 at 11:55 AM Ferenc Csaky 
> wrote:
>
> > +1 (non-binding)
> >
> >
> >
> >
> > --- Original Message ---
> > On Tuesday, November 29th, 2022 at 15:39, Márton Balassi <
> > balassi.mar...@gmail.com> wrote:
> >
> >
> > >
> > >
> > > +1 (binding)
> > >
> > > On Tue, Nov 29, 2022 at 6:13 AM Chenya Zhang
> chenyazhangche...@gmail.com
> > >
> > > wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > On Sun, Nov 27, 2022 at 5:49 PM Jiangang Liu
> liujiangangp...@gmail.com
> > > > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best,
> > > > > Jiangang Liu
> > > > >
> > > > > Thomas Weise t...@apache.org 于2022年11月28日周一 06:23写道:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > On Sat, Nov 26, 2022 at 8:11 AM Zheng Yu Chen
> jam.gz...@gmail.com
> > > > > > wrote:
> > > > > >
> > > > > > > +1(no-binding)
> > > > > > >
> > > > > > > Maximilian Michels m...@apache.org 于 2022年11月24日周四 上午12:25写道:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I'd like to start a vote for FLIP-271 [1] which we previously
> > > > > > > > discussed
> > > > > > > > on
> > > > > > > > the dev mailing list [2].
> > > > > > > >
> > > > > > > > I'm planning to keep the vote open for at least until
> Tuesday,
> > Nov
> > > > > > > > 29.
> > > > > > > >
> > > > > > > > -Max
> > > > > > > >
> > > > > > > > [1]
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> > > >
> > > > > > > > [2]
> > > > > > > >
> > https://lists.apache.org/thread/pvfb3fw99mj8r1x8zzyxgvk4dcppwssz
> >
>


Re: [DISCUSS] FLIP-275: Support Remote SQL Client Based on SQL Gateway

2022-11-30 Thread yu zelin
Hi, all

Thanks Jim’s questions below. Here I’d like to reply to them.

>   1. For the Client Parser, is it going to work with the extended syntax
>   from the Flink Table Store?
> 
>   2. Relatedly, what will happen if an older Client tries to handle syntax
>   that a newer service supports?  (Suppose I use a 1.17 client with a 1.18
>   Gateway/system which has a new keyword.  Is there anything we should be
>   designing for upfront?)
> 
>   3. How will client and server version mismatches be handled?  Will a
>   single gateway be able to support multiple endpoint versions?
>   4. How are commands which change a session handled?  Are those sent via
>   an ExecuteStatementRequest?
> 
>   5. The remote POC uses polling for getting back status and getting back
>   results.  Would it be possible to switch to web sockets or some other
>   mechanism to avoid polling?  If polling is used for both, the polling
>   frequency should be different between local and remote configurations.
> 
>   6. What does this sentence mean?  "The reason why we didn't get the sql
>   type in client side is because it's hard for the lightweight client-level
>   parser to recognize some sql type  sql, such as query with CTE.  "
> 
>   7. What is the serialization lifecycle for results?  It makes sense to
>   have some control over whether the gateway returns results as SQL or JSON.
>   I'd love to see a way to avoid needing to serialize and deserialize results
>   on the SQL Gateway if possible.  I'm still new enough to the project that
>   I'm not sure if that's readily possible.  Maybe the SQL Gateway's return
>   type can be sent as part of the request so that the JobManager can send
>   back results in an advantageous format?
> 
>   8. Does ErrorType need to be marked as @PublicEvolving?
> 
> I'm excited for the SQL client to support gateway mode!  Given the change
> in design, do you think it'll still be part of the Flink 1.17 release?

1.  ClientParser can work with new (and unknown) SQL syntax. It is because if 
the 
sql type is not recognized, the sql will be submitted to the gateway directly.

For more information: Actually, the proposed ClientParser only do two things: 
(1) Tell client commands (help, clear, etc) and sqls apart.
(2) parses several sql types (e.g. SHOW CREATE statement, we can print raw 
string 
for the SHOW CREATE result instead of table). Here the recognization of sql 
types 
mostly affects the print style, and unrecognized sql also can be submitted to 
cluster. 
So the Client with new ClientParser can work compatible with new syntax.

2. First, I'd like to explain that the gateway APIs and supported syntax is two 
things. 
For example, ‘configureSession' and 'completeStatement' are APIs. As mentioned 
in #1, the sql statements which syntax is unknown will be submitted to the 
gateway, 
and whether they can be executed normally depends on whether the execution 
environment supports the syntax. 

> Is there anything we should be designing for upfront?

The 'SqlGatewayRestAPIVersion’ has been introduced. But it is for sql gateway 
APIs.

3. 
> How will client and server version mismatches be handled?

A lower version client can work compatible with a higher version gateway 
because the 
old interfaces won’t be deleted. When a higher version client connects to a 
lower version 
gateway, the client should notify the users if they try to use unsupported 
features. For 
example, the client start option ‘-i’  means using initialization file to 
initialize the session.
We plan to use the gateway’s ‘configureSession’ to implement it. But this API 
is not 
implemented in 1.16 Gateway (SqlGatewayRestAPIVersion = V1), so if the user try 
to
use ‘-i’ option to start the client with the 1.16 gateway, the client should 
tell the user that
Can’t execute ‘-i’ option with gateway which version is lower than V2.

>  Will a single gateway be able to support multiple endpoint versions?

Currently, the gateway only starts a highest version endpoint and the higher 
version endpoint 
is compatible with the lower version endpoint’s protocol.

4. Yes. Mostly, we use ’SET’ and ‘RESET’ statements to change the session 
configuration. 
Notice: the client can’t change the session (I mean, close current session and 
open another
one). I’m not sure if you have need to change the session itself?

5. 
>  Would it be possible to switch to web sockets or some other mechanism to 
> avoid polling?

Your suggestion is very good, but this flip is for supporting the remote 
client. How about taking 
it as a future work? 

> If polling is used for both, the polling frequency should be different 
> between local and remote 
configurations.

Our idea is to introduce a new session option (like 
'sql-client.result.fetch-interval') to control 
the fetching requests sending frequency. What do you think?

For more information: we are inclined to keep the polling behavior in this 
version. For streaming 
query, fetching results synchronously may occupy 

Re: [VOTE] FLIP-271: Autoscaling

2022-11-30 Thread Maximilian Michels
+1 (binding)

I hereby close the vote. I'll send the result in a separate email with the
subject "[RESULT][VOTE] FLIP-271: Autoscaling"

Thanks,
Max

On Wed, Nov 30, 2022 at 2:24 PM Yun Tang  wrote:

> + 1 (binding)
>
> Best
> Yun Tang
> 
> From: Mason Chen 
> Sent: Wednesday, November 30, 2022 12:52
> To: dev@flink.apache.org 
> Subject: Re: [VOTE] FLIP-271: Autoscaling
>
> +1 (non-binding)
>
> On Tue, Nov 29, 2022 at 11:55 AM Ferenc Csaky 
> wrote:
>
> > +1 (non-binding)
> >
> >
> >
> >
> > --- Original Message ---
> > On Tuesday, November 29th, 2022 at 15:39, Márton Balassi <
> > balassi.mar...@gmail.com> wrote:
> >
> >
> > >
> > >
> > > +1 (binding)
> > >
> > > On Tue, Nov 29, 2022 at 6:13 AM Chenya Zhang
> chenyazhangche...@gmail.com
> > >
> > > wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > On Sun, Nov 27, 2022 at 5:49 PM Jiangang Liu
> liujiangangp...@gmail.com
> > > > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best,
> > > > > Jiangang Liu
> > > > >
> > > > > Thomas Weise t...@apache.org 于2022年11月28日周一 06:23写道:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > On Sat, Nov 26, 2022 at 8:11 AM Zheng Yu Chen
> jam.gz...@gmail.com
> > > > > > wrote:
> > > > > >
> > > > > > > +1(no-binding)
> > > > > > >
> > > > > > > Maximilian Michels m...@apache.org 于 2022年11月24日周四 上午12:25写道:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I'd like to start a vote for FLIP-271 [1] which we previously
> > > > > > > > discussed
> > > > > > > > on
> > > > > > > > the dev mailing list [2].
> > > > > > > >
> > > > > > > > I'm planning to keep the vote open for at least until
> Tuesday,
> > Nov
> > > > > > > > 29.
> > > > > > > >
> > > > > > > > -Max
> > > > > > > >
> > > > > > > > [1]
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> > > >
> > > > > > > > [2]
> > > > > > > >
> > https://lists.apache.org/thread/pvfb3fw99mj8r1x8zzyxgvk4dcppwssz
> >
>


Re: [jira] [Created] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2022-11-30 Thread Galen Warren
Thanks Fabian.

Do you know if the legacy StreamingFileSink has the same issues? Not asking
you to do research but just wondering if you happen to know.


On Tue, Nov 29, 2022 at 8:46 AM Fabian Paul  wrote:

> Hi folks,
>
> I did some initial investigation, and the problem seems twofold.
>
> If no post-commit topology is used, we do not run into a problem where
> we could lose data but since we do not clean up the state correctly,
> we will hit this [1] when trying to stop the pipeline with a savepoint
> after we have started it from a savepoint.
> AFAICT all two-phase commit sinks are affected Kafka, File etc.
>
> For sinks using the post-commit topology, the same applies.
> Additionally, we might never do the commit from the post-commit
> topology resulting in lost data.
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/ed46cb2fd64f1cb306ae5b7654d2b4d64ab69f22/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java#L83
>


Re: [DISCUSS]Introduce a time-segment based restart strategy

2022-11-30 Thread weijie guo
Hi, all,

Thank you very much for this interesting discussion.

TBH, Dong's proposal made me very excited. Our users don't need to be
tortured by choosing the right one among many strategies.

However, as Gen said, it may need to make some changes to the
RestartBackoffTimeStrategy interface as it does not perceive the concept of
pipelined region now. Even though it is easy to pass in the required
parameters (involved regions) to it. Personally, it is very important to
keep the protocol simple and maintain the components' own responsibilities
well. Pipelined region should not be remembered by the
RestartBackoffTimeStrategy at least at present. it is not necessary for me
to make this change before seeing more benefits.

Based on the above reasons and other factors he mentioned,+1 for Gen's
proposal.

Best regards,

Weijie


Gen Luo  于2022年11月25日周五 19:42写道:

> Hi all,
>
> Sorry for the late jumping in.
>
> To meet Weihua's need, Dong's proposal seems pretty fine, but the
> modification it requires, I'm afraid, is not really easy.
> RestartBackoffTimeStrategy is quite a simple interface. The strategy even
> doesn't know which task is failing, not to mention the division of pipeline
> regions.
> To distinguish the failure count of each regions, it lacks too much
> information, which is not easy to acquire for the strategy.
> One approch I can figure out is to create different strategy instances to
> different regions. In this way we do not need to modify the strategy but do
> need to modify the schedulers or the ExecutionFailureHandler.
>
> On the other hand, I realize another case that the restart strategy may
> need to be aware of the types and occurrence rate of the exceptions. That
> is to avoid failing over but directly fail the job when some errors happen.
> I know that there's an annotation
> `@ThrowableAnnotation(ThrowableType.NonRecoverableError)` that can fail the
> job, but I'm afraid there can be some scenarios that can not annotate the
> exceptions, or catch and warp with an annotated exception.
> In such cases, handling in the restart strategy can be a good choice.
> Such a strategy can even combines with other existing strategies which
> handle the failure rate rather than the cause type.
>
> Besides, given that new strategies may be necessary, and existing
> strategies may also need to enhance, maybe we should make the
> RestartBackoffTimeStrategy a plugin rather than the enumerations, or
> introduce a new custom type strategy which can load customized
> implementations.
> This can not solve the problem immediately, but makes the choice of restart
> strategy more flexiable.
> What do you think about this?
>
> Thanks.
>
> Paul Lam  于 2022年11月21日周一 17:46写道:
>
> > Dong’s proposal LGTM.
> >
> > Best,
> > Paul Lam
> >
> > > 2022年11月19日 10:50,Dong Lin  写道:
> > >
> > > Hey Weihua,
> > >
> > > Thanks for proposing the new strategy!
> > >
> > > If I understand correctly, the main issue is that different failover
> > > regions can be restarted independently, but they share the same counter
> > > when counting the number of failures in an interval. So the number of
> > > failures for a given region is less than what users expect.
> > >
> > > Given that regions can be restarted independently, it might be more
> > usable
> > > and intuitive to count the number of failures for each region when
> > > executing the failover strategy. Thus, instead of adding a new failover
> > > strategy, how about we update the existing failure-rate strategy, and
> > > probably other existing strategies as well, to use the following
> > semantics:
> > >
> > > - For any given region in the job, its number of failures in
> > > failure-rate-interval should not exceed max-failures-per-interval.
> > > Otherwise, the job will fail without being restarted.
> > >
> > > By using this updated semantics, the keyby-connected job will have the
> > same
> > > behavior as the existing Flink when we use failure-rate strategy. For
> > > the rescale-connected
> > > job, in the case you described above, after the TM fails, each of the 3
> > > regions will increment its failure count from 0 to 1, which is still
> less
> > > than max-failures-per-interval. Thus the rescale-connected job can
> > continue
> > > to work.
> > >
> > > This alternative approach can solve the problem without increasing the
> > > complexity of the failover strategy choice. And this approach does not
> > > require us to check whether two exceptions belong to the same root
> cause.
> > > Do you think it can work?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Fri, Nov 4, 2022 at 4:46 PM Weihua Hu 
> wrote:
> > >
> > >> Hi, everyone
> > >>
> > >> I'd like to bring up a discussion about restart strategy. Flink
> > supports 3
> > >> kinds of restart strategy. These work very well for jobs with specific
> > >> configs, but for platform users who manage hundreds of jobs, there is
> no
> > >> common strategy to use.
> > >>
> > >> Let me explain the reason. We manage a lot of 

Re: [VOTE] FLIP-271: Autoscaling

2022-11-30 Thread Yun Tang
+ 1 (binding)

Best
Yun Tang

From: Mason Chen 
Sent: Wednesday, November 30, 2022 12:52
To: dev@flink.apache.org 
Subject: Re: [VOTE] FLIP-271: Autoscaling

+1 (non-binding)

On Tue, Nov 29, 2022 at 11:55 AM Ferenc Csaky 
wrote:

> +1 (non-binding)
>
>
>
>
> --- Original Message ---
> On Tuesday, November 29th, 2022 at 15:39, Márton Balassi <
> balassi.mar...@gmail.com> wrote:
>
>
> >
> >
> > +1 (binding)
> >
> > On Tue, Nov 29, 2022 at 6:13 AM Chenya Zhang chenyazhangche...@gmail.com
> >
> > wrote:
> >
> > > +1 (non-binding)
> > >
> > > On Sun, Nov 27, 2022 at 5:49 PM Jiangang Liu liujiangangp...@gmail.com
> > > wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Jiangang Liu
> > > >
> > > > Thomas Weise t...@apache.org 于2022年11月28日周一 06:23写道:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On Sat, Nov 26, 2022 at 8:11 AM Zheng Yu Chen jam.gz...@gmail.com
> > > > > wrote:
> > > > >
> > > > > > +1(no-binding)
> > > > > >
> > > > > > Maximilian Michels m...@apache.org 于 2022年11月24日周四 上午12:25写道:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > I'd like to start a vote for FLIP-271 [1] which we previously
> > > > > > > discussed
> > > > > > > on
> > > > > > > the dev mailing list [2].
> > > > > > >
> > > > > > > I'm planning to keep the vote open for at least until Tuesday,
> Nov
> > > > > > > 29.
> > > > > > >
> > > > > > > -Max
> > > > > > >
> > > > > > > [1]
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> > >
> > > > > > > [2]
> > > > > > >
> https://lists.apache.org/thread/pvfb3fw99mj8r1x8zzyxgvk4dcppwssz
>


[jira] [Created] (FLINK-30253) Introduce io executor for standalone checkpoint store

2022-11-30 Thread Yun Tang (Jira)
Yun Tang created FLINK-30253:


 Summary: Introduce io executor for standalone checkpoint store
 Key: FLINK-30253
 URL: https://issues.apache.org/jira/browse/FLINK-30253
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Yun Tang
 Fix For: 1.17.0


Currently, only standalone checkpoint store does not have io executor to 
execute IO related operations, we can introduce the io executor to standalone 
checkpoint store to speed up such IO operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-273: Improve Catalog API to Support ALTER TABLE syntax

2022-11-30 Thread Shengkai Fang
Hi,

Considering there has been no response for a long time, I want to start the
vote on Thursday.

Best,
Shengkai

Jingsong Li  于2022年11月24日周四 15:20写道:

> Thanks for the update!
>
> Looks good to me.
>
> Best,
> Jingsong
>
> On Thu, Nov 24, 2022 at 3:07 PM Shengkai Fang  wrote:
> >
> > Hi, Jingsong.
> >
> > Thanks for your feedback!
> >
> > > Can we define classes at once so that the connector can be fully
> > implemented, but we will not pass changes of the nested column.
> >
> > It's hard to achieve this. If a new need comes, we will add a new kind of
> > TableChange. But I think the current proposal aligns with the Flink
> syntax.
> >
> > > In addition, can we define the modified class of the nested column as
> the
> > parent class? The top-level column modification is just a special case
> of a
> > subclass.
> >
> > I think the modification of the composite type is much more complicated.
> It
> > also modifies the MAP, ARRAY type. For example, Spark supports to modify
> > the element of ARRAY type with the following syntax:
> >
> >
> > ```
> > -- add a field to the struct within an array. Using keyword 'element' to
> > access the array's element column.
> > ALTER TABLE prod.db.sample
> > ADD COLUMN points.element.z double
> > ```
> >
> > For the traditional database, they use the ALTER TYPE syntax to modify
> the
> > composite type.
> >
> > ```
> > CREATE TYPE compfoo AS (f1 int, f2 text);
> >
> > -- add column
> > ALTER TYPE compfoo ADD ATTRIBUTE f3 int;
> > -- rename column
> > ALTER TYPE compfoo RENAME ATTRIBUTE f3 TO f4;
> > -- drop column
> > ALTER TYPE compfoo DROP ATTRIBUTE f4;
> > -- modify type
> > ALTER TYPE compfoo ALTER ATTRIBUTE f1 TYPE text;
> >
> > ```
> >
> > I think the modification of the top-level column is different from the
> > modification of the nested field, and we don't need to introduce a base
> > class. BTW, the RexFieldAccess is also not the parent class of the
> > RexInputRef in Calcite.
> >
> > > ModifyColumn VS fine-grained Change
> >
> > The syntax in Flink also supports modifying the physical column to a
> > metadata column, metadata column definition, etc. The main problem here
> is
> > we need to expose what kind of changes happen when modifying the metadata
> > column, modifying the computed column, and changing the column kind. With
> > these possibilities, it may include:
> >
> > base: ModifyColumnComment, MoidfyColumnPosition;
> > physical column: ModifyPhysicalColumnDataType,
> > ModifyPhysicalColumnNullability;
> > metadata column: ModifyMetadataColumnMetadataKey,
> > ModifyMetadataColumnVirtual, ModifyMetdataColumnDataType;
> > computed column: ModifyComputedColumnExpression
> > column type change: ModifyPhysicalColumnToMetadataColumn,
> > ModifyPhysicalColumnToComputedColumn,
> ModifyComputedColumnToPhysicalColumn,
> > ModifyComputedColumnToMetadataColumn,
> > ModifyMetadataColumnToPhysicalColumn,
> ModifyMetadataColumnToComputedColumn.
> >
> > I just wonder whether it's better we still keep the ModifyColumn and
> > introduce some fine-grained TableChanges because the most cases above are
> > not useful to external catalog, e.g. JDBCCatalog. So I think we can
> > introduce the ModifyColumn that represents column modification and
> >
> > ```
> > /** Modify the column comment */
> > public class ModifyColumnComment extends ModifyColumn {
> >
> > String getComment();
> >
> > }
> >
> > /** Modify the column position. */
> > public class ModifyColumnPosition extends ModifyColumn {}
> >
> > /** Modify the physical column data type. */
> > public class ModifyPhysicalColumnType extends ModifyColumn {
> >
> > DataType getNewType();
> >
> > }
> >
> > ```
> >
> > When altering the physical column, the statement will produce the
> > fine-grained changes. In other cases, we will still produce the base
> class
> > ModifyColumn.
> >
> >
> > > ModifyColumn can also rename the column.
> >
> > Yes. I have modified the FLIP and moved the RenameColumn and added a
> class
> > named ModifyColumnName extends ModifyColumn.
> >
> > Best,
> > Shengkai
>


[jira] [Created] (FLINK-30252) Publish flink-shaded pom

2022-11-30 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-30252:


 Summary: Publish flink-shaded pom
 Key: FLINK-30252
 URL: https://issues.apache.org/jira/browse/FLINK-30252
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: shaded-17.0


Publish a bom for flink-shaded, such that downtream projects just select the 
flink-shaded version, with all other dependency versions being selected 
automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] Release flink-connector-jdbc v3.0.0, release candidate #1

2022-11-30 Thread Martijn Visser
Hi everyone,
Please review and vote on the release candidate #1 for the
flink-connector-jdbc version v3.0.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

Note: This is the first externalized version of the JDBC connector.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which are signed with the key with fingerprint
A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v3.0.0-rc1 [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Martijn

https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352590
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.0.0-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1554/
[5] https://github.com/apache/flink-connector-jdbc/releases/tag/v3.0.0-rc1
[6] https://github.com/apache/flink-web/pull/590


Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-30 Thread Yun Gao
Hi Dawid, Piotr
Very thanks for the discussion!
As a whole I think we are already consistent with the callback option, and I 
don't 
think I opposed that we could modify the current internal implementation. But 
from 
my side it is still not clear what the actual interfaces are proposing. Let me 
first try 
to summarize that a bit:
1) Which object does the handlers register on?
It seems there are two options, one is to timer services (InternalTimerService
/ ProcessingTimerService or some equivalent things after refactoring), the other
one is as a lifecycle of the operator. I'm now tending to the latter one, how 
do 
you think on this part?
2) What is the interface of the handler?
Option 1 is that 
interface SomeHandlerName {
 void processingTimer(Timer timer);
}
class Timer {
 long getTimestamp();
 void trigger();
 void cancel();
 // Other actions if required. 
}
But it seems there is controversy on whether to add actions to the timer class. 
If without that, with my understanding the interfaces of the Option 2 are
interface SomeHandlerName {
 void processTimer(Timer timer); 
}
interface KeyedSomeHandlerName {
 void processKeyedTimer(KeyedTimer timer, Context ctx); 
}
class Timer {
 long getTimestamp();
}
class KeyedTimer extends Timer {
 KEY getKey();
 NAMESPACE getNamespace();
}
void Context {
void executeAtScheduledTime(Consumer handler);
}
As Piotr has pointed out, if we could eliminate the logic of namespace, we 
could then
remove the namespace related type parameter and method from the interfaces.
Do I understand right?
Besides, I'm still fully got the reason that why we should not add the actions 
to the 
timer class, in consideration that it seems in most cases users could implement 
their 
logical with simply calling timer.trigger() (I think the repeat registration is 
indeed a 
problem, but I think we could ignore the timers registered during termination). 
Could you further enlighten me a bit on this part?
Best,
Yun Gao
--
From:Piotr Nowojski 
Send Time:2022 Nov. 30 (Wed.) 17:10
To:dev 
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job 
Termination
Hi,
I have a couple of remarks.
First a general one. For me the important part in the design of this API is
how to expose this to Flink users in public interfaces. Namely
ProcessFunction and StreamOperator. InternalTimerService is an internal
class, so we can change it and break it as needed in the future.
For registering a handler like proposed by Dawid:
interface SomeHandlerName {
 void onTimer(/* whatever type it is */ timer, Context ctx ) {
 }
}
makes sense to me. For the InternalTimerService I think it doesn't matter
too much what we do. We could provide a similar interface as for the
ProcessFunction/StreamOperator, it doesn't have to be the same one. On the
contrary, I think it shouldn't be the same, as part of this effort we
shouldn't be exposing the concept of `Namespaces` to the public facing API.
Re the "waitFor". Theoretically I see arguments why users might want to use
this, but I'm also not convinced whether that's necessary in practice. I
would be +1 either way. First version can be without this functionality and
we can add it later (given that we designed a good place to add it in the
future, like the `Context` proposed by Dawid). But I'm also fine adding it
now if others are insisting.
Best,
Piotrek
śr., 30 lis 2022 o 09:18 Dawid Wysakowicz 
napisał(a):
> WindowOperator is not implemented by users. I can see that for
> InternalTimerService we'll need
>
> interface PendingTimerProcessor {
> void onTimer(InternalTimer timer) {
> doHandleTimer(timer);
> }
>
> I don't see a problem with that.
>
> As you said ProcessingTimeService is a user facing interface and
> completely unrelated to the InternalTimerService. I don't see a reason
> why we'd need to unify those.
>
> As for the waitFor behaviour. Personally, I have not been convinced it
> is necessary. Maybe it's just my lack of vision, but I can't think of a
> scenario where I'd use it. Still if we need it, I'd go for something like:
>
> void onTimer(/* whatever type it is */ timer, Context ctx ) {
>
> }
>
> interface Context {
> void executeAtScheduledTime(Consumer handler);
> }
>
>
> That way you have independent simple interfaces that need to work only
> in a single well defined scenario and you don't need to match an
> interface to multiple different cases.
>
> Best,
> Dawid
>
> On 30/11/2022 07:27, Yun Gao wrote:
> > Hi Dawid,
> > Thanks for the comments!
> > As a whole I'm also open to the API and I also prefer to use simple
> > but flexible interfaces, but it still looks there are some problem to
> > just let users to implement the termination actions.
> > Let's take the WindowOperator as an example. As seen in [1],
> > in the timer processing logic it needs to acquire the key / namespace
> > information bound to the timer (which is only supported by the
> InternalTimerService).
> > Thus 

[jira] [Created] (FLINK-30251) Move the IO with DFS during abort checkpoint to an asynchronous thread.

2022-11-30 Thread ming li (Jira)
ming li created FLINK-30251:
---

 Summary: Move the IO with DFS during abort checkpoint to an 
asynchronous thread.
 Key: FLINK-30251
 URL: https://issues.apache.org/jira/browse/FLINK-30251
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.15.2, 1.16.0
Reporter: ming li
 Attachments: image-2022-11-30-19-10-51-226.png

Currently when the {{checkpoint}} fails, we process the abort message in the 
Task's {{{}mailbox{}}}. We will close the output stream and delete the file on 
DFS. 

 

However, when the {{checkpoint}} failure is caused by a DFS system failure (for 
example, the namenode failure of HDFS), this operation may take a long time or 
hang, resulting in the Task being unable to continue processing data and the 
Task will not be able to process the data at this time.

 

So I think we can put the operation of deleting files in an asynchronous thread 
just like uploading checkpoint data asynchronously.

 

!image-2022-11-30-19-10-51-226.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30250) The flame graph type is wrong

2022-11-30 Thread Rui Fan (Jira)
Rui Fan created FLINK-30250:
---

 Summary: The flame graph type is wrong
 Key: FLINK-30250
 URL: https://issues.apache.org/jira/browse/FLINK-30250
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST, Runtime / Web Frontend
Affects Versions: 1.16.0, 1.15.0, 1.17.0
Reporter: Rui Fan
 Fix For: 1.17.0
 Attachments: image-2022-11-30-19-08-42-067.png

When the flame graph type is switched from On-CPU to Mixed. It still show the 
graph of On-CPU.
h2. Root cause:

When click the other types, the web frontend will call the requestFlameGraph 
and update the graphType. However, the graphType is the old type during 
requestFlameGraph. So the graph type show the new type, but the flame graph is 
the result of old type.

 [code link 
|https://github.com/apache/flink/blob/8bbf52688758bbede45df060a4c11e5fa228b6f0/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts#L82]

!image-2022-11-30-19-08-42-067.png|width=1026,height=389!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] Release flink-connector-pulsar v3.0.0, release candidate #1

2022-11-30 Thread Martijn Visser
Hi everyone,
Please review and vote on the release candidate #1 for the
flink-connector-pulsar version v3.0.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

Note: this is equivalent to the Pulsar connector that was released with
Flink 1.16. This is the externalized version.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which are signed with the key with fingerprint
A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v3.0.0-rc1 [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Martijn

https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352588
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-connector-pulsar-3.0.0-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1553/
[5] https://github.com/apache/flink-connector-pulsar/releases/tag/v3.0.0-rc1
[6] https://github.com/apache/flink-web/pull/589


[jira] [Created] (FLINK-30249) TableUtils.getRowTypeInfo() creating wrong TypeInformation

2022-11-30 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30249:
-

 Summary: TableUtils.getRowTypeInfo() creating wrong TypeInformation
 Key: FLINK-30249
 URL: https://issues.apache.org/jira/browse/FLINK-30249
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0
Reporter: Zhipeng Zhang
 Fix For: ml-2.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-30 Thread Piotr Nowojski
Hi,

I have a couple of remarks.

First a general one. For me the important part in the design of this API is
how to expose this to Flink users in public interfaces. Namely
ProcessFunction and StreamOperator. InternalTimerService is an internal
class, so we can change it and break it as needed in the future.

For registering a handler like proposed by Dawid:

interface SomeHandlerName {
  void onTimer(/* whatever type it is */ timer, Context ctx ) {
  }
}

makes sense to me. For the InternalTimerService I think it doesn't matter
too much what we do. We could provide a similar interface as for the
ProcessFunction/StreamOperator, it doesn't have to be the same one. On the
contrary, I think it shouldn't be the same, as part of this effort we
shouldn't be exposing the concept of `Namespaces` to the public facing API.

Re the "waitFor". Theoretically I see arguments why users might want to use
this, but I'm also not convinced whether that's necessary in practice. I
would be +1 either way. First version can be without this functionality and
we can add it later (given that we designed a good place to add it in the
future, like the `Context` proposed by Dawid). But I'm also fine adding it
now if others are insisting.

Best,
Piotrek

śr., 30 lis 2022 o 09:18 Dawid Wysakowicz 
napisał(a):

> WindowOperator is not implemented by users. I can see that for
> InternalTimerService we'll need
>
> interface PendingTimerProcessor {
> void onTimer(InternalTimer timer) {
>   doHandleTimer(timer);
>   }
>
> I don't see a problem with that.
>
> As you said ProcessingTimeService is a user facing interface and
> completely unrelated to the InternalTimerService. I don't see a reason
> why we'd need to unify those.
>
> As for the waitFor behaviour. Personally, I have not been convinced it
> is necessary. Maybe it's just my lack of vision, but I can't think of a
> scenario where I'd use it. Still if we need it, I'd go for something like:
>
> void onTimer(/* whatever type it is */ timer, Context ctx ) {
>
> }
>
> interface Context {
>void executeAtScheduledTime(Consumer handler);
> }
>
>
> That way you have independent simple interfaces that need to work only
> in a single well defined scenario and you don't need to match an
> interface to multiple different cases.
>
> Best,
> Dawid
>
> On 30/11/2022 07:27, Yun Gao wrote:
> > Hi Dawid,
> > Thanks for the comments!
> > As a whole I'm also open to the API and I also prefer to use simple
> > but flexible interfaces, but it still looks there are some problem to
> > just let users to implement the termination actions.
> > Let's take the WindowOperator as an example. As seen in [1],
> > in the timer processing logic it needs to acquire the key / namespace
> > information bound to the timer (which is only supported by the
> InternalTimerService).
> > Thus if we want users to implement the same logic on termination, we
> either let users
> > to trigger the timer handler directly or we also allows users to access
> these piece of
> > information. If we go with the later direction, we might need to provide
> interfaces like
> > interface PendingTimerProcessor {
> > void onTimer(Timer timer) {
> >   doHandleTimer(timer);
> >   }
> > }
> > class Timer {
> >   long getTimestamp();
> >   KEY getKey();
> >   NAMESPACE getNamespace();
> > }
> > Then we'll have the issue that since we need the interface to handle
> both of cases of
> > InternalTimerSerivce and raw ProcessTimeService, the later do not have
> key and
> > namespace information attached, and its also be a bit inconsistency for
> users to have to set
> > the KEY and NAMESPACE types.
> > Besides, it looks to me that if we want to implement behaviors like
> waiting for, it might
> > be not simply reuse the time handler time, then it requires every
> operator authors to
> > re-implement such waiting logics.
> >> Moreover it still have the downside that if you call back to the
> `onTimer` method after
> >> `trigger` you have access to the Context which lets you register new
> timers.
> > I think we could simply drop the timers registered during we start
> processing the pending timers
> > on termination. Logically there should be no new data after termination.
> >> I think I am not convinced to these arguments. First of all I'm afraid
> there is no clear distinction
> >> in that area what is runtime and what is not. I always found
> `AbstracStreamOperator(*)` actually part
> >> of runtime or Flink's internals and thus I don't find
> `InternalTimerService` a utility, but a vital part
> >> of the system. Let's be honest it is impossible to implement an
> operator without extending from
> >> `AbstractStreamOperator*`.What would be the problem with having a
> proper implementation in
> >> `InternalTimerService`? Can't we do it like this?:
> > I think the original paragraph is only explanation to that the interface
> is harder to support if we
> > allows the users to implement the arbitrary logic. But since now we are
> at the page with the 

[jira] [Created] (FLINK-30248) Spark writer supports insert overwrite

2022-11-30 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30248:


 Summary: Spark writer supports insert overwrite
 Key: FLINK-30248
 URL: https://issues.apache.org/jira/browse/FLINK-30248
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-dynamodb v3.0.0, release candidate #0

2022-11-30 Thread Danny Cranmer
+1 (binding)

- Validated hashes/signature
- Verified that no binaries exist in the source archive
- Build the source with Maven
- Verified NOTICE file
- Verified versions in pom files are correct
- Verified SinkIntoDynamoDb sample application writes to DynamoDB

Thanks,
Danny

On Tue, Nov 29, 2022 at 9:26 PM Martijn Visser 
wrote:

> Hi Danny,
>
> +1 (binding)
>
> - Validated hashes
> - Verified signature
> - Verified that no binaries exist in the source archive
> - Build the source with Maven
> - Verified licenses
> - Verified web PR
>
> Best regards,
>
> Martijn
>
> On Tue, Nov 29, 2022 at 8:37 PM Martijn Visser  wrote:
>
> > Hi Danny,
> >
> > +1 (binding)
> >
> > - Validated hashes
> > - Verified signature
> > - Verified that no binaries exist in the source archive
> > - Build the source with Maven
> > - Verified licenses
> > - Verified web PR
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, Nov 29, 2022 at 12:57 PM Hamdy, Ahmed
> 
> > wrote:
> >
> >> +1 (non-binding)
> >>
> >> On 29/11/2022, 08:27, "Teoh, Hong" 
> wrote:
> >>
> >> CAUTION: This email originated from outside of the organization. Do
> >> not click links or open attachments unless you can confirm the sender
> and
> >> know the content is safe.
> >>
> >>
> >>
> >> +1 (non-binding)
> >>
> >> * Hashes and Signatures look good
> >> * All required files on dist.apache.org
> >> * Tag is present in Github
> >> * Verified source archive does not contain any binary files
> >> * Source archive builds using maven
> >> * Started packaged example SQL job using SQL client. Verified that
> it
> >> writes successfully to the sink table.
> >> * Verified sink metrics look ok.
> >>
> >>
> >> Cheers,
> >> Hong
> >>
> >> On 28/11/2022, 16:44, "Danny Cranmer" 
> >> wrote:
> >>
> >> CAUTION: This email originated from outside of the organization.
> >> Do not click links or open attachments unless you can confirm the sender
> >> and know the content is safe.
> >>
> >>
> >>
> >> Hi everyone,
> >> Please review and vote on the release candidate #0 for the
> >> version 3.0.0 as
> >> follows:
> >> [ ] +1, Approve the release
> >> [ ] -1, Do not approve the release (please provide specific
> >> comments)
> >>
> >>
> >> The complete staging area is available for your review, which
> >> includes:
> >> * JIRA release notes [1],
> >> * the official Apache source release to be deployed to
> >> dist.apache.org [2],
> >> which are signed with the key with fingerprint 125FD8DB [3],
> >> * all artifacts to be deployed to the Maven Central Repository
> >> [4],
> >> * source code tag v3.0.0-rc0 [5],
> >> * website pull request listing the new release [6].
> >>
> >> The vote will be open for at least 72 hours (Thursday 1st
> >> December 17:00
> >> UTC). It is adopted by majority approval, with at least 3 PMC
> >> affirmative
> >> votes.
> >>
> >> Please note, this is a new connector and the first release.
> >>
> >> Thanks,
> >> Danny
> >>
> >> [1]
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352277
> >> [2]
> >>
> >>
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-aws-3.0.0-rc0
> >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> >> [4]
> >> https://repository.apache.org/content/repositories/orgapacheflink-1552/
> >> [5]
> >> https://github.com/apache/flink-connector-aws/releases/tag/v3.0.0-rc0
> >> [6] https://github.com/apache/flink-web/pull/588
> >>
> >>
> >>
>


Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-30 Thread Dawid Wysakowicz
WindowOperator is not implemented by users. I can see that for 
InternalTimerService we'll need


interface PendingTimerProcessor {
void onTimer(InternalTimer timer) {
 doHandleTimer(timer);
 }

I don't see a problem with that.

As you said ProcessingTimeService is a user facing interface and 
completely unrelated to the InternalTimerService. I don't see a reason 
why we'd need to unify those.


As for the waitFor behaviour. Personally, I have not been convinced it 
is necessary. Maybe it's just my lack of vision, but I can't think of a 
scenario where I'd use it. Still if we need it, I'd go for something like:


void onTimer(/* whatever type it is */ timer, Context ctx ) {

}

interface Context {
  void executeAtScheduledTime(Consumer handler);
}


That way you have independent simple interfaces that need to work only 
in a single well defined scenario and you don't need to match an 
interface to multiple different cases.


Best,
Dawid

On 30/11/2022 07:27, Yun Gao wrote:

Hi Dawid,
Thanks for the comments!
As a whole I'm also open to the API and I also prefer to use simple
but flexible interfaces, but it still looks there are some problem to
just let users to implement the termination actions.
Let's take the WindowOperator as an example. As seen in [1],
in the timer processing logic it needs to acquire the key / namespace
information bound to the timer (which is only supported by the 
InternalTimerService).
Thus if we want users to implement the same logic on termination, we either let 
users
to trigger the timer handler directly or we also allows users to access these 
piece of
information. If we go with the later direction, we might need to provide 
interfaces like
interface PendingTimerProcessor {
void onTimer(Timer timer) {
  doHandleTimer(timer);
  }
}
class Timer {
  long getTimestamp();
  KEY getKey();
  NAMESPACE getNamespace();
}
Then we'll have the issue that since we need the interface to handle both of 
cases of
InternalTimerSerivce and raw ProcessTimeService, the later do not have key and
namespace information attached, and its also be a bit inconsistency for users 
to have to set
the KEY and NAMESPACE types.
Besides, it looks to me that if we want to implement behaviors like waiting 
for, it might
be not simply reuse the time handler time, then it requires every operator 
authors to
re-implement such waiting logics.

Moreover it still have the downside that if you call back to the `onTimer` 
method after
`trigger` you have access to the Context which lets you register new timers.

I think we could simply drop the timers registered during we start processing 
the pending timers
on termination. Logically there should be no new data after termination.

I think I am not convinced to these arguments. First of all I'm afraid there is 
no clear distinction
in that area what is runtime and what is not. I always found 
`AbstracStreamOperator(*)` actually part
of runtime or Flink's internals and thus I don't find `InternalTimerService` a 
utility, but a vital part
of the system. Let's be honest it is impossible to implement an operator 
without extending from
`AbstractStreamOperator*`.What would be the problem with having a proper 
implementation in
`InternalTimerService`? Can't we do it like this?:

I think the original paragraph is only explanation to that the interface is 
harder to support if we
allows the users to implement the arbitrary logic. But since now we are at the 
page with the callback
option, users could always be allowed to implement arbitrary logic no matter we 
support timer.trigger()
  or not, thus I think now there is no divergence on this point. I also believe 
in we'll finally have some logic
similar to the proposed one that drain all the times and process it.
Best,
Yun Gao
[1] 
https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
 

--
From:Dawid Wysakowicz 
Send Time:2022 Nov. 28 (Mon.) 23:33
To:dev 
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job 
Termination
Do we really need to have separate methods for triggering/waiting/cancelling. 
To me it sounds rather counterintuitive. Why can't users just execute whatever 
they want in the handler itself instead of additional back and forth with the 
system? Moreover it still have the downside that if you call back to the 
`onTimer` method after `trigger` you have access to the Context which lets you 
register new timers.
I find following approach much simpler:
  void onTimer(...) {
  doHandleTimer(timestamp);
  }
  void processPendingTimer(...) {
  // trigger
  doHandleTimer(timestamp);
  // for cancel, 

[jira] [Created] (FLINK-30247) Introduce Time Travel reading for table store

2022-11-30 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30247:


 Summary: Introduce Time Travel reading for table store
 Key: FLINK-30247
 URL: https://issues.apache.org/jira/browse/FLINK-30247
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.3.0


For example:
- SELECT * FROM T /*+ OPTIONS('as-of-timestamp-mills'='121230')*/; Read 
snapshot specific by commit time.
- SELECT * FROM T /*+ OPTIONS('as-of-snapshot'='12')*/; Read snapshot specific 
by snapshot id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)