[jira] [Created] (FLINK-30585) Improve flame graph performance at subtask level

2023-01-05 Thread Rui Fan (Jira)
Rui Fan created FLINK-30585:
---

 Summary: Improve flame graph performance at subtask level
 Key: FLINK-30585
 URL: https://issues.apache.org/jira/browse/FLINK-30585
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan


After FLINK-30185 , we can view the flame graph of subtask level. However, it 
always collects flame graphs for all subtasks.

We should collect the flame graph of single subtask instead of all subtasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30584) Update the flame graph doc of subtask level

2023-01-05 Thread Rui Fan (Jira)
Rui Fan created FLINK-30584:
---

 Summary: Update the flame graph doc of subtask level
 Key: FLINK-30584
 URL: https://issues.apache.org/jira/browse/FLINK-30584
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
 Fix For: 1.17.0


Update the flame graph doc of subtask level



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30583) Provide the flame graph to the subtask level

2023-01-05 Thread Rui Fan (Jira)
Rui Fan created FLINK-30583:
---

 Summary: Provide the flame graph to the subtask level
 Key: FLINK-30583
 URL: https://issues.apache.org/jira/browse/FLINK-30583
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST, Runtime / Web Frontend
Reporter: Rui Fan
 Fix For: 1.17.0


Provide the flame graph to the subtask level



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-05 Thread Dong Lin
Hi John and Wencong,

Thanks for the reply!

It is nice that optional-2 can address the problem without affecting the
existing source connectors as far as functionality is concerned. One
potential concern with this approach is that it might increase the Flink
runtime overhead by adding one more virtual functional call to the
per-record runtime call stack.

Since Java's default MaxInlineLevel is 12-18, I believe it is easy for an
operator chain of 5+ operators to exceed this limit. In this case. And
option-2 would incur one more virtual table lookup to produce each record.
It is not clear how much this overhead would show up for jobs with a chain
of lightweight operators. I am recently working on FLINK-30531
 to reduce runtime
overhead which might be related to this discussion.

In comparison to option-2, the option-3 provided in my earlier email would
not add this extra overhead. I think it might be worthwhile to invest in
the long-term performance (and simpler runtime infra) and pay for the
short-term cost of deprecating this metric in SourceOperatorBase. What do
you think?

Regards,
Dong


On Thu, Jan 5, 2023 at 10:10 PM Wencong Liu  wrote:

> Hi, All
>
>
> Thanks for the reply!
>
>
> I think both John and Dong's opinions are reasonable. John's Suggestion 2
> is a good implementation.
> It does not affect the existing source connectors, but also provides
> support
> for custom counter in the future implementation.
>
>
> WDYT?
>
>
> Best,
>
> Wencong Liu


Re: [DISCUSS] FLIP-282: Introduce Delete & Update API

2023-01-05 Thread Jingsong Li
Thanks yuxia for your explanation.

But what I mean is that this may lead to confusion for implementers
and users. You can use comments to explain it. However, a good
interface can make the mechanism clearer through code design.

So here, I still think an independent SupportsXX interface can make
the behavior more clear.

Best,
Jingsong

On Wed, Jan 4, 2023 at 10:56 AM yuxia  wrote:
>
> Hi, Jingsong, thanks for your comments.
>
> ## About RowLevelDeleteMode
> That's really a good suggestion, now I have updated the FLIP to make 
> RowLevelDeleteMode a higer level.
>
> ## About scope of addContextParameter
> Sorry for the confusing, now I have updated the FLIP to add more comments for 
> it. The scope for the parameters is limited to the phase
> that Flink translates ranslates physical RelNode to ExecNode.
> It's possible to see all the other sources and sinks in a topo. For the 
> order, if only one sink, the sink will be last one to see the parametes,
> the order for the sources is consistent to the order the table source nodes 
> are translated to ExecNode.
> If there're multiple sinks for the case of StatementSet, the sink may also 
> see the parameters added by the table sources that belong the statment
> added earlier.
>
> ## About scope of getScanPurpose
> Yes, all sources wil see this method. But it won't bring any compatibility 
> issues for in here we just tell the source scan
> what's scan purpose without touching any other logic. If sources ignore this 
> method, it just works as normally. So I think there's
> no necessary to add a new interface like SupportsXX.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Jingsong Li" 
> 收件人: "dev" 
> 发送时间: 星期二, 2023年 1 月 03日 下午 12:12:12
> 主题: Re: [DISCUSS] FLIP-282: Introduce Delete & Update API
>
> Thanks yuxia for the FLIP! It looks really good!
>
> I have three comments:
>
> ## RowLevelDeleteMode
>
> Can RowLevelDeleteMode be a higher level?
> `SupportsRowLevelDelete.RowLevelDeleteMode` is better than
> `SupportsRowLevelDelete.RowLevelDeleteInfo.RowLevelDeleteMode`.
> Same as `RowLevelUpdateMode`.
>
> ## Scope of addContextParameter
>
> I see that some of your comments are for sink, but can you make it
> clearer here? What exactly is its scope? For example, is it possible
> to see all the other sources and sinks in a topo? What is the order of
> seeing?
>
> ## Scope of getScanPurpose
>
> Will all sources see this method? Will there be compatibility issues?
> If sources ignore this method, will this cause strange phenomena?
>
> What I mean is: should another SupportsXX be created here to provide
> delete and update.
>
> Best,
> Jingsong
>
> On Thu, Dec 29, 2022 at 6:23 PM yuxia  wrote:
> >
> > Hi, Lincoln Lee;
> > 1: Yes,  it's a typo; Thanks for pointing out. I have fixed the typo.
> > 2: For stream users,  assuming for delete, they will receive 
> > TableException("DELETE TABLE is not supported for streaming mode now"); 
> > Update is similar. I also update them to the FLIP.
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Lincoln Lee" 
> > 收件人: "dev" 
> > 发送时间: 星期三, 2022年 12 月 28日 上午 9:50:50
> > 主题: Re: [DISCUSS] FLIP-282: Introduce Delete & Update API
> >
> > Hi yuxia,
> >
> > Thanks for the proposal! I think it'll be very useful for users in batch
> > scenarios to cooperate with external systems.
> >
> > For the flip I have two questions:
> > 1. Is it a typo the default method 'default ScanPurpose getScanPurpose();'
> > without implementation in interface ScanContext?
> > 2. For stream users, what exceptions will be received for this unsupported
> > operations?
> >
> > Best,
> > Lincoln Lee
> >
> >
> > yuxia  于2022年12月26日周一 20:24写道:
> >
> > > Hi, devs.
> > >
> > > I'd like to start a discussion about FLIP-282: Introduce Delete & Update
> > > API[1].
> > >
> > > Row-Level SQL Delete & Update are becoming more and more important in
> > > modern big data workflow. The use cases include deleting a set of rows for
> > > regulatory compliance, updating a set of rows for data correction, etc.
> > > So, in this FLIP, I want to introduce Delete & Update API to Flink in
> > > batch mode. With these interfaces, the external connectors will have
> > > ability to delete & update existing data in the corresponding storages.
> > >
> > > Looking forwards to your feedback.
> > >
> > > [1]:
> > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235838061
> > >
> > >
> > > Best regards,
> > > Yuxia
> > >
> > >


[jira] [Created] (FLINK-30582) Flink-avro Flink-orc free for flink-table-store-format

2023-01-05 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30582:


 Summary: Flink-avro Flink-orc free for flink-table-store-format
 Key: FLINK-30582
 URL: https://issues.apache.org/jira/browse/FLINK-30582
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.4.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: flink add multiple sink in sequence

2023-01-05 Thread Shammon FY
Hi @Great

I think the two sinks in your example are equivalent and independent. If
there are some logical relationships between two sinks, you may need to
create a new combined sink and do it yourself.

On Thu, Jan 5, 2023 at 11:48 PM Great Info  wrote:

>
> I have a stream from Kafka, after reading it and doing some
> transformations/enrichment I need to store the final data stream in the
> database and publish it to Kafka so I am planning to add two sinks like
> below
>
>
> *finalInputStream.addSink(dataBaseSink); // Sink1finalInputStream.addSink(
> flinkKafkaProducer ); //Sink2 *
>
> Has the sequence guaranteed between Sink1 and Sink2, in my requirement
> stream to sink2 should start to begin only after successfully completing
> Sink1, If Sink1 fails it should not write to Sink2.
>


[ANNOUNCE] Flink 1.17 feature freeze extended until January 31st, 2023

2023-01-05 Thread Qingsheng Ren
Hi devs,

As discussed previously in the mailing list [1], the release managers of
Flink 1.17 have decided to extend the feature freeze until *January 31st,
2023*, which gives us 3 weeks for development from now.

Considering the entire timeline, there will be no extension after January
31st, 2023. Please arrange new features in the next release if they cannot
be finished before the closing date.

Thanks everyone for your excellent works in Flink 1.17, and wish you a
happy new year!

[1] https://lists.apache.org/thread/6v9mjx1jgf4l083hqw54rp4cf5fsnfom

Best regards,
Qingsheng, Leonard, Matthias and Martijn


[jira] [Created] (FLINK-30581) Deprecate FileStoreTableITCase and use CatalogITCaseBase

2023-01-05 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30581:


 Summary: Deprecate FileStoreTableITCase and use CatalogITCaseBase
 Key: FLINK-30581
 URL: https://issues.apache.org/jira/browse/FLINK-30581
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.4.0


We recommend users to use Catalog tables instead managed tables.
Managed tables should be deprecated. Now we already did not expose managed in 
documentation. We can remove it.
Before removing, tests should be refactored.

FileStoreTableITCase with managed tables should be changed to CatalogITCaseBase.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30580) [umbrella] Refactor tests for table store

2023-01-05 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30580:


 Summary: [umbrella] Refactor tests for table store
 Key: FLINK-30580
 URL: https://issues.apache.org/jira/browse/FLINK-30580
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.4.0


This is a umbrella issue to improve tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30579) Introducing cofigurable option to enable hive native function

2023-01-05 Thread dalongliu (Jira)
dalongliu created FLINK-30579:
-

 Summary: Introducing cofigurable option to enable hive native 
function
 Key: FLINK-30579
 URL: https://issues.apache.org/jira/browse/FLINK-30579
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Affects Versions: 1.16.0
Reporter: dalongliu
 Fix For: 1.17.0


Currently, hive native function implementation can't assign behavior with hive 
udaf, so we should introduce an configurable option to allow enable this 
optimization, the default behavior is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30578) AVRO-3700: Publish SBOM artifacts

2023-01-05 Thread Dongjoon Hyun (Jira)
Dongjoon Hyun created FLINK-30578:
-

 Summary: AVRO-3700: Publish SBOM artifacts
 Key: FLINK-30578
 URL: https://issues.apache.org/jira/browse/FLINK-30578
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.17.0
Reporter: Dongjoon Hyun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30577) OpenShift FlinkSessionJob artifact write error on non-default namespaces

2023-01-05 Thread James Busche (Jira)
James Busche created FLINK-30577:


 Summary: OpenShift FlinkSessionJob artifact write error on 
non-default namespaces
 Key: FLINK-30577
 URL: https://issues.apache.org/jira/browse/FLINK-30577
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
Reporter: James Busche


[~tagarr] has pointed out an issue with using the /opt/flink/artifacts 
filesystem on OpenShift in non-default namespaces.  The OpenShift permissions 
don't allow write to /opt.  
```
org.apache.flink.util.FlinkRuntimeException: Failed to create the dir: 
/opt/flink/artifacts/jim/basic-session-deployment-only-example/basic-session-job-only-example
```
A few ways to solve the problem are:
1. Remove the comment on line 34 here in 
[flink-conf.yaml|https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/conf/flink-conf.yaml#L34]
 and change it to: /tmp/flink/artifacts

2. Append this after line 143 here in 
[values.yaml|https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/values.yaml#L142]:
kubernetes.operator.user.artifacts.base.dir: /tmp/flink/artifacts

3.  Changing it in line 142 of 
[KubernetesOperatorConfigOptions.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java#L142]
 like this:
.defaultValue("/tmp/flink/artifacts") 
and then rebuilding the operator image.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: CDC from Oracle database reading directly logs - integration with OpenLogReplicator

2023-01-05 Thread Adam Leszczyński
Thanks Leonard, Jark,

I will just reply on the dev list for this topic as this is more related with 
development. Sorry, I have sent on 2 lists - I don’t want to add more chaos 
here.

The answer to your question is not straight, so I will start from a broader 
picture.

Maybe first I will describe some assumptions that I have chosen while designing 
OpenLogReplicator. The project is aimed to be minimalistic. It should only 
contain the code that is necessary to do parsing of Oracle redo logs. Nothing 
more, it should not be a fully functional replicator. So, the targets are 
limited to middleware (like Kafka, Flink, some MQ). The amount of dependencies 
is reduced to minimal.

The second assumption is to make the project  stateless wherever possible. The 
goal is to put on HA (Kubernetes) and store state in Redis (not yet 
implemented). But generally OpenLogReplicator should not handle the information 
(if possible) about the position of data confirmed by the receiver. This would 
allow the receiver to choose way of handling failures (data to be duplicated on 
restart, idempotent message).

The third topic is initial data load. There is plenty of available software for 
that. There is absolutely no need to duplicate it in this project. No ETL, 
selects, etc. My goal is just to track changes.

The fourth assumption is to write code in C++ so that the code is fast, and I 
have full control over memory. The code can fully reuse memory and work also 
with machines with little memory. This allows easy compilation on Linux, but 
maybe in the future also on Solaris, AIX, HP-UX, or even Windows (if there is 
demand for that). I think Java is good for some solutions but not for a binary 
parser which heavily works with memory and in most cases uses zero copy 
approach.

Amount of data in the output is actually defined by source database (how much 
data is logged - full schema or just changed columns). I don’t care. The user 
defines that what is logged by db. If just primary key and changed columns - I 
can send just changed data. If someone wants full schema in every payload - 
this is fine too. If schema changes - no problem, I can provide just DDL 
commands and process further payloads with new schema.

Format of data - this is actually defined by the receiver. My first choice was 
JSON. Next the Debezium guys asked me to support Protobuf. Ok, I have spend a 
lot of time and extended the architecture to actually make the code modular and 
allow to choose the format of the payload. The writer module can directly 
produce json or protobuf payload. Actually that can be extended to any other 
format if there is demand for that. Also the json format allows many options 
regarding format. I generally don’t test protobuf format code - I would treat 
that as a prototype because I know nobody who would like to use it. This code 
was planned for Debezium request but so far nobody cares.

For integration with other systems, languages - this is an open case. Actually 
I am here agnostic. The data that is produced for output is stored in a buffer 
and can be sent to any target. This is done by the Writer module (you can look 
at the code) and there is a writer for Kafka, ZeroMQ and even plain network 
tcp/ip connection. I don’t understand the question regarding to adapt that 
better. If I have a specification I can extend. Say what you need.

In such case when we have bidirectional connection not like with Kafka - the 
receiver can define starting position of data (scn) of the stream he/she wants 
to receive.

You can look at the prototype code how this communication would look like: 
StreamClient.cpp - but please rather treat that as a working prototype. This is 
a client which just connects to OpenLogReplicator using network, and defines 
the starting scn and then just receives payload. 

In case when:
- The connection is broken: the client would reconnect and tell the last 
confirmed data and just ask for the following transactions
- If OpenLogReplicator crashes - after restart the client would tell the last 
confirmed data and ask for the following transactions
- If the client crashes - the client would need to recover itself and ask for 
the transactions that are after the data that is confirmed by the client

I assume that if the client confirms about some scn that is processed, 
OpenLogReplicator can remove that from cache and it is not possible that after 
reconnect the client would demand some data that it previously declared as 
confirmed.

Well, 
This is what is currently done, some code was driven by request from the 
Debezium team towards future integration, like Support for Protobuf or put some 
data to the payload. But never used.
We have opened a ticket in their Jira for integration: 
https://issues.redhat.com/projects/DBZ/issues/DBZ-2543?filter=allopenissues But 
there is no progress and no feedback if they want to make integration or not. I 
have made some effort to allow easier integration but I’m not 

flink add multiple sink in sequence

2023-01-05 Thread Great Info
I have a stream from Kafka, after reading it and doing some
transformations/enrichment I need to store the final data stream in the
database and publish it to Kafka so I am planning to add two sinks like
below


*finalInputStream.addSink(dataBaseSink); // Sink1finalInputStream.addSink(
flinkKafkaProducer ); //Sink2 *

Has the sequence guaranteed between Sink1 and Sink2, in my requirement
stream to sink2 should start to begin only after successfully completing
Sink1, If Sink1 fails it should not write to Sink2.


[jira] [Created] (FLINK-30576) JdbcOutputFormat refactor

2023-01-05 Thread Jira
João Boto created FLINK-30576:
-

 Summary: JdbcOutputFormat refactor
 Key: FLINK-30576
 URL: https://issues.apache.org/jira/browse/FLINK-30576
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: João Boto


This refactor is to allow the use of JdbcOutputFormat on Sink2

Actually the JdbcOutputFormat needs the RuntimeContext to check if ObjectReuse 
is active or not..

The refactor is for change from RuntimeContext to ExecutionConfig (we still 
need that ExecutionConfig be available on Sink2.InitContext, and a FLIP will be 
raised)

 

[~wanglijie] this is what we talk about



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re:Re: [DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-05 Thread Wencong Liu
Hi, All


Thanks for the reply!


I think both John and Dong's opinions are reasonable. John's Suggestion 2 is a 
good implementation. 
It does not affect the existing source connectors, but also provides support 
for custom counter in the future implementation.


WDYT?


Best,

Wencong Liu

[DISCUSS] FLIP-285: Refactoring the leader election code base in Flink

2023-01-05 Thread Matthias Pohl
Hi everyone,
I brought up FLINK-26522 [1] in the mailing list discussion about
consolidating the HighAvailabilityServices interfaces [2], previously.
There, it was concluded that the community still wants the ability to have
per-component leader election and, therefore, keep the
HighAvailabilityServices interface as is. I went back to work on
FLINK-26522 [1] to figure out how we can simplify the current codebase
keeping the decision in mind.

I wanted to handle FLINK-26522 [1] as a follow-up cleanup task of
FLINK-24038 [3]. But while working on it, I realized that even FLINK-24038
[3] shouldn't have been handled without a FLIP. The per-process leader
election which was introduced in FLINK-24038 [3] changed the ownership of
certain components. This is actually a change that should have been
discussed in the mailing list and deserved a FLIP. To overcome this
shortcoming of FLINK-24038 [3], I decided to prepare FLIP-285 [4] to
provide proper documentation of what happened in FLINK-24038 and what will
be manifested with resolving its follow-up FLINK-26522 [1].

Conceptually, this FLIP proposes moving away from Flink's support for
single-contender LeaderElectionServices and introducing multi-contender
support by disconnecting the HA-backend leader election lifecycle from the
LeaderContender's lifecycle. This allows us to provide LeaderElection per
component (as it was requested in [2]) but also enables us to utilize a
single leader election for multiple components/contenders as well without
the complexity of the code that was introduced by FLINK-24038 [3].

I'm looking forward to your comments.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-26522
[2] https://lists.apache.org/thread/9oy2ml9s3j1v6r77h31sndhc3gw57cfm
[3] https://issues.apache.org/jira/browse/FLINK-24038
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box


[jira] [Created] (FLINK-30575) Autoscaler doesn't scale down on 0 load

2023-01-05 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30575:
--

 Summary: Autoscaler doesn't scale down on 0 load
 Key: FLINK-30575
 URL: https://issues.apache.org/jira/browse/FLINK-30575
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora


The logic for computing true processing rates rely on recordsProcessed / 
recordProcessingTime style computations which do not really work well when 
everything is 0.

This leads to no scaling actions when the load suddenly drops to 0. We should 
handle these special cases and scale the pipeline to some minimal parallelism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30574) Do not scale further up if last scaling was ineffective

2023-01-05 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30574:
--

 Summary: Do not scale further up if last scaling was ineffective
 Key: FLINK-30574
 URL: https://issues.apache.org/jira/browse/FLINK-30574
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


Related to https://issues.apache.org/jira/browse/FLINK-30571 but somewhat 
simpler. 

It should be easy to detect when a scale up operation did not significantly 
increase the parallelism (or even deteriorated it). In these cases it is very 
important to not try to scale further.

This can happen if the job hit some external limit (such as rate limiting on an 
external service) or if the performance problems are caused by other factors 
such as memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30573) Table Store dedicated compact job may skip some records when checkpoint interval is long

2023-01-05 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-30573:
---

 Summary: Table Store dedicated compact job may skip some records 
when checkpoint interval is long
 Key: FLINK-30573
 URL: https://issues.apache.org/jira/browse/FLINK-30573
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Affects Versions: table-store-0.3.0, table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.3.0, table-store-0.4.0


Currently the sink for Table Store dedicated compact job only receives records 
about what buckets are changed, instead of what files are changed. If the 
writer is kept open, new files of this bucket may be skipped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-274: Introduce metric group for OperatorCoordinator

2023-01-05 Thread Hang Ruan
Hi, Martijn,

Thanks for your reply. Very glad for your help to review it.
Let's make this vote last until Jan 10th.

Best,
Hang

Leonard Xu  于2023年1月5日周四 16:16写道:

> Thanks for driving this FLIP, Hang!
>
> +1 (binding)
>
> Best,
> Leonard
>
> > On Jan 5, 2023, at 3:08 AM, Martijn Visser 
> wrote:
> >
> > Hi Hang,
> >
> > I haven't had time to read the FLIP yet since this is still a holiday
> > period in Europe. I would like to read it in the next few days. Can you
> > keep the vote open a little longer?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Wed, Jan 4, 2023 at 2:01 PM Dong Lin  wrote:
> >
> >> Thanks for proposing the FLIP!
> >>
> >> +1 (binding)
> >>
> >> Regards,
> >> Dong
> >>
> >> On Wed, Jan 4, 2023 at 10:08 AM Hang Ruan 
> wrote:
> >>
> >>> Hi all,
> >>>
> >>> Thanks for all the feedback so far.
> >>> Based on the discussion[1], we have come to a consensus, so I would
> like
> >> to
> >>> start a vote on FLIP-274: Introduce metric group for
> >>> OperatorCoordinator[2].
> >>>
> >>> The vote will last for at least 72 hours (Jan 7th at 11:00 GMT) unless
> >>> there is an objection or insufficient votes.
> >>>
> >>> [1] https://lists.apache.org/thread/63m9w60rndqnrqvgb6qosvt2bcbww53k
> >>> [2]
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
> >>>
> >>> Best,
> >>> Hang
> >>>
> >>
>
>


[jira] [Created] (FLINK-30572) Make parquet as default data file format

2023-01-05 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30572:


 Summary: Make parquet as default data file format
 Key: FLINK-30572
 URL: https://issues.apache.org/jira/browse/FLINK-30572
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.4.0


- We have done some tests. Parquet is 30% faster.
- After FLINK-30565, Parquet can support complex types and file systems such as 
OSS and s3 (decoupled from hadoop filesystem).
- After FLINK-30569, the table can switch formats at will.

Therefore, if detailed and comprehensive tests have been carried out here, we 
can use Parquet as the default format.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-280: Introduce a new explain mode to provide SQL advice

2023-01-05 Thread Jane Chan
Hi, devs,

After discussing with Godfrey , Lincoln
, and Jark , I've updated the
FLIP document[1] and look forward to your opinions and suggestions.

The highlight difference is listed as follows.

   - *The proposed syntax changes from EXPLAIN ANALYZED_PHYSICAL_PLAN
to EXPLAIN PLAN_ADVICE *.
  - The reason for changing the syntax is that the output format and
  analyzed target are two orthogonal concepts and better be
decoupled. On the
  other hand, users may care about the advice content instead of which plan
  is analyzed, and thus PHYSICAL should be kept from users.


   - *The output format changes from JSON to current tree-style text.
   Introduce ExplainFormat to classify the output format.*
  - The current output format is a mixture of plain text (AST,
  Optimized Physical Plan, and Optimized Execution Plan) and JSON (Physical
  Execution Plan,  via EXPLAIN JSON_EXECUTION_PLAN ), which is not
structured
  and categorized. By introducing ExplainFormat, we can better classify the
  output format and have more flexibility to extend more formats in the
  future.


   - *The PlanAnalyzer installation gets rid of SPI.*
  - PlanAnalyzer should be an internal interface and not be exposed to
  users. Therefore, the Factory mechanism is unsuitable for this.


To Godfrey , Jingsong , and
Shengkai , Thanks for your comments and questions.

@Jingsong

> Can you give examples of other systems for the syntax?
> In other systems, is EXPLAIN ANALYZE already PHYSICAL_PLAN?
>

For other systems like MySQL[2], PostgreSQL[3], Presto[4], and TiDB[5]

EXPLAIN ANALYZE 
is the mainstream syntax.

However, it represents an actual measurement of the cost, i.e., the
statement will execute the statement, which is unsuitable for this
condition.


`EXPLAIN ANALYZED_PHYSICAL_PLAN ` looks a bit strange, and even
> stranger that it contains `advice`. The purpose of FLIP seems to be a bit
> more to `advice`, so can we just
> introduce a syntax for `advice`?


Good point. After several discussions, the syntax has been updated to

EXPLAIN PLAN_ADVICE 

@Godfrey

Do we really need to expose `PlanAnalyzerFactory` as public interface?
> I prefer we only expose ExplainDetail#ANALYZED_PHYSICAL_PLAN and the
> analyzed result.
> Which is enough for users and consistent with the results of `explain`
> method.The classes about plan analyzer are in table planner module, which
> does not public api (public interfaces should be defined in
> flink-table-api-java module). And PlanAnalyzer is depend on RelNode, which
> is internal class of planner, and not expose to users.


Good point. After reconsideration, the SPI mechanism is removed from the
FLIP. PlanAnalyzer should be an internal implementation much similar to a
RelOptRule, and should not be exposed to users.

@Shengkai

> 1. `PlanAnalyzer#analyze` uses the FlinkRelNode as the input. Could you
> share some thoughts about the motivation? In my experience, users mainly
> care about 2 things when they develop their job:

a. Why their SQL can not work? For example, their streaming SQL contains an
> OVER window but their ORDER key is not ROWTIME. In this case, we may don't
> have a physical node or logical node because, during the optimization, the
> planner has already thrown the exception.
>

 The prerequisite for providing advice is that the optimized physical can
be generated. The planner should throw exceptions if the query contains
syntax errors or other problems.



> b. Many users care about whether their state is compatible after upgrading
> their Flink version. In this case, I think the old execplan and the SQL
> statement are the user's input.


Good point. State compatibility detection is beneficial, but it better be
decoupled with EXPLAIN PLAN_ADVICE. We could provide a separate mechanism
for cross-version validation.


2. I am just curious how other people add the rules to the Advisor. When
> rules increases, all these rules should be added to the Flink codebase?


It is much similar to adding a RelOptRule to RuleSet. The number of
analyzers will not be growing too fast. So adding them to the Flink
codebase may not be a concern.


3. How do users configure another advisor?


 After reconsideration, I would like to let PlanAdvisor be an internal
interface, which is different from implementing a custom connector/format.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-280%3A+Introduce+EXPLAIN+PLAN_ADVICE+to+provide+SQL+advice
[2] https://dev.mysql.com/doc/refman/8.0/en/explain.html#explain-analyze
[3] https://www.postgresql.org/docs/current/sql-explain.html
[4] https://prestodb.io/docs/current/sql/explain-analyze.html
[5] https://docs.pingcap.com/tidb/dev/sql-statement-explain-analyze

Best regards,
Jane

On Tue, Jan 3, 2023 at 6:20 PM Jingsong Li  wrote:

> Thanks Jane for the FLIP! It looks very nice!
>
> Can you give examples of other systems for the syntax?
> In other systems, is EXPLAIN ANALYZE already PHYSICAL_PLAN?

[jira] [Created] (FLINK-30571) Compute scale parallelism based on observed scalability

2023-01-05 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30571:
--

 Summary: Compute scale parallelism based on observed scalability 
 Key: FLINK-30571
 URL: https://issues.apache.org/jira/browse/FLINK-30571
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


When computing target parallelism for job vertices we currently assume linear 
scaling with a fixed (1) coefficient.

This assumes that in order to double the capacity we simply double the 
parallelism.

While linearity already might be violated by many real time workloads this form 
of strong linearity rarely holds due to the overhead of increased network 
traffic, coordination etc.

As we can access past (parallelism, processingRate) information based on the 
scaling history we should estimate the scalability coefficient either using a 
simple or weighted linear regression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates

2023-01-05 Thread Aitozi (Jira)
Aitozi created FLINK-30570:
--

 Summary: RexNodeExtractor#isSupportedPartitionPredicate generates 
unexpected partition predicates
 Key: FLINK-30570
 URL: https://issues.apache.org/jira/browse/FLINK-30570
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Aitozi


Currently, the condition {{where rand(1) > 0.0125}} will be recognized as a 
partition predicates and will be evaluated to false when compiling the SQL. It 
has two problem. 
First, it should not be recognized as a partition predicates, and the 
nondeterministic function should never pass the partition pruner 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-274: Introduce metric group for OperatorCoordinator

2023-01-05 Thread Leonard Xu
Thanks for driving this FLIP, Hang!

+1 (binding)

Best,
Leonard

> On Jan 5, 2023, at 3:08 AM, Martijn Visser  wrote:
> 
> Hi Hang,
> 
> I haven't had time to read the FLIP yet since this is still a holiday
> period in Europe. I would like to read it in the next few days. Can you
> keep the vote open a little longer?
> 
> Best regards,
> 
> Martijn
> 
> On Wed, Jan 4, 2023 at 2:01 PM Dong Lin  wrote:
> 
>> Thanks for proposing the FLIP!
>> 
>> +1 (binding)
>> 
>> Regards,
>> Dong
>> 
>> On Wed, Jan 4, 2023 at 10:08 AM Hang Ruan  wrote:
>> 
>>> Hi all,
>>> 
>>> Thanks for all the feedback so far.
>>> Based on the discussion[1], we have come to a consensus, so I would like
>> to
>>> start a vote on FLIP-274: Introduce metric group for
>>> OperatorCoordinator[2].
>>> 
>>> The vote will last for at least 72 hours (Jan 7th at 11:00 GMT) unless
>>> there is an objection or insufficient votes.
>>> 
>>> [1] https://lists.apache.org/thread/63m9w60rndqnrqvgb6qosvt2bcbww53k
>>> [2]
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
>>> 
>>> Best,
>>> Hang
>>> 
>>