Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-02 Thread Jark Wu
Hi Timo,

I will respond some of the questions:

1) SQL client specific options

Whether it starts with "table" or "sql-client" depends on where the
configuration takes effect.
If it is a table configuration, we should make clear what's the behavior
when users change
the configuration in the lifecycle of TableEnvironment.

I agree with Shengkai `sql-client.planner` and `sql-client.execution.mode`
are something special
that can't be changed after TableEnvironment has been initialized. You can
see
`StreamExecutionEnvironment` provides `configure()`  method to override
configuration after
StreamExecutionEnvironment has been initialized.

Therefore, I think it would be better to still use  `sql-client.planner`
and `sql-client.execution.mode`.

2) Execution file

>From my point of view, there is a big difference between
`sql-client.job.detach` and
`TableEnvironment.executeMultiSql()` that `sql-client.job.detach` will
affect every single DML statement
in the terminal, not only the statements in SQL files. I think the single
DML statement in the interactive
terminal is something like tEnv#executeSql() instead of
tEnv#executeMultiSql.
So I don't like the "multi" and "sql" keyword in `table.multi-sql-async`.
I just find that runtime provides a configuration called
"execution.attached" [1] which is false by default
which specifies if the pipeline is submitted in attached or detached mode.
It provides exactly the same
functionality of `sql-client.job.detach`. What do you think about using
this option?

If we also want to support this config in TableEnvironment, I think it
should also affect the DML execution
 of `tEnv#executeSql()`, not only DMLs in `tEnv#executeMultiSql()`.
Therefore, the behavior may look like this:

val tableResult = tEnv.executeSql("INSERT INTO ...")  ==> async by default
tableResult.await()   ==> manually block until finish
tEnv.getConfig().getConfiguration().setString("execution.attached", "true")
val tableResult2 = tEnv.executeSql("INSERT INTO ...")  ==> sync, don't need
to wait on the TableResult
tEnv.executeMultiSql(
"""
CREATE TABLE   ==> always sync
INSERT INTO ...  => sync, because we set configuration above
SET execution.attached = false;
INSERT INTO ...  => async
""")

On the other hand, I think `sql-client.job.detach`
and `TableEnvironment.executeMultiSql()` should be two separate topics,
as Shengkai mentioned above, SQL CLI only depends on
`TableEnvironment#executeSql()` to support multi-line statements.
I'm fine with making `executeMultiSql()` clear but don't want it to block
this FLIP, maybe we can discuss this in another thread.


Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#execution-attached

On Wed, 3 Feb 2021 at 15:33, Shengkai Fang  wrote:

> Hi, Timo.
> Thanks for your detailed feedback. I have some thoughts about your
> feedback.
>
> *Regarding #1*: I think the main problem is whether the table environment
> has the ability to update itself. Let's take a simple program as an
> example.
>
>
> ```
> TableEnvironment tEnv = TableEnvironment.create(...);
>
> tEnv.getConfig.getConfiguration.setString("table.planner", "old");
>
>
> tEnv.executeSql("...");
>
> ```
>
> If we regard this option as a table option, users don't have to create
> another table environment manually. In that case, tEnv needs to check
> whether the current mode and planner are the same as before when executeSql
> or explainSql. I don't think it's easy work for the table environment,
> especially if users have a StreamExecutionEnvironment but set old planner
> and batch mode. But when we make this option as a sql client option, users
> only use the SET command to change the setting. We can rebuild a new table
> environment when set successes.
>
>
> *Regarding #2*: I think we need to discuss the implementation before
> continuing this topic. In the sql client, we will maintain two parsers. The
> first parser(client parser) will only match the sql client commands. If the
> client parser can't parse the statement, we will leverage the power of the
> table environment to execute. According to our blueprint,
> TableEnvironment#executeSql is enough for the sql client. Therefore,
> TableEnvironment#executeMultiSql is out-of-scope for this FLIP.
>
> But if we need to introduce the `TableEnvironment.executeMultiSql` in the
> future, I think it's OK to use the option `table.multi-sql-async` rather
> than option `sql-client.job.detach`. But we think the name is not suitable
> because the name is confusing for others. When setting the option false, we
> just mean it will block the execution of the INSERT INTO statement, not DDL
> or others(other sql statements are always executed synchronously). So how
> about `table.job.async`? It only works for the sql-client and the
> executeMultiSql. If we set this value false, the table environment will
> return the result until the job finishes.
>
>
> *Regarding #3, #4*: I still think we should use DELETE JAR and LIST JAR
> 

[jira] [Created] (FLINK-21247) flink iceberg table map cannot convert to datastream

2021-02-02 Thread donglei (Jira)
donglei created FLINK-21247:
---

 Summary: flink iceberg table map cannot convert to 
datastream
 Key: FLINK-21247
 URL: https://issues.apache.org/jira/browse/FLINK-21247
 Project: Flink
  Issue Type: New Feature
 Environment: iceberg master

flink 1.12

 

 
Reporter: donglei
 Attachments: image-2021-02-03-15-38-42-340.png, 
image-2021-02-03-15-40-27-055.png, image-2021-02-03-15-41-34-426.png, 
image-2021-02-03-15-43-19-919.png

Flink Iceberg Table with map
!image-2021-02-03-15-38-42-340.png!
 
we want to read the table like this :
 
String querySql = "SELECT 
ftime,extinfo,country,province,operator,apn,gw,src_ip_head,info_str,product_id,app_version,sdk_id,sdk_version,hardware_os,qua,upload_ip,client_ip,upload_apn,event_code,event_result,package_size,consume_time,event_value,event_time,upload_time,boundle_id,uin,platform,os_version,channel,brand,model
 from bfzt3 ";
Table table = tEnv.sqlQuery(querySql);

DataStream sinkStream = tEnv.toAppendStream(table, 
Types.POJO(AttaInfo.class, map));

sinkStream.map(x->1).returns(Types.INT).keyBy(new 
NullByteKeySelector()).reduce((x,y) -> {
 return x+y;
}).print();
 
 
when read  we find a exception
 
2021-02-03 15:37:57
java.lang.ClassCastException: 
org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableMapData cannot be 
cast to org.apache.flink.table.data.binary.BinaryMapData
at 
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:107)
at 
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:47)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:166)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:129)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
 
we find that iceberg map is  ReusableMapData implements MapData 
!image-2021-02-03-15-40-27-055.png!
 
this is the exception 
!image-2021-02-03-15-41-34-426.png!
MapData has two default implements  GenericMapData and BinaryMapData
from iceberg implement is ReusableMapData
 
so i think that code should change to like this 
!image-2021-02-03-15-43-19-919.png!
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-158: Generalized incremental checkpoints

2021-02-02 Thread Arvid Heise
FLIP looks good to me. +1

On Wed, Feb 3, 2021 at 8:00 AM Piotr Nowojski  wrote:

> I'm carrying over my +1 from the discussion thread.
>
> Piotrek
>
> śr., 3 lut 2021 o 05:55 Yuan Mei  napisał(a):
>
> > As aforementioned in the discussion thread, +1 on this Flip vote.
> >
> > On Wed, Feb 3, 2021 at 6:36 AM Khachatryan Roman <
> > khachatryan.ro...@gmail.com> wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to start a vote on FLIP-158 [1] which was discussed in [2].
> > >
> > > The vote will be open for at least 72 hours. Unless there are any
> > > objections,
> > > I'll close it by February 5, 2021 if we have received sufficient votes.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
> > >
> > > [2]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-158-Generalized-incremental-checkpoints-td47902.html
> > >
> > >
> > >
> > > Regards,
> > > Roman
> > >
> >
>


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-02 Thread Shengkai Fang
Hi, Timo.
Thanks for your detailed feedback. I have some thoughts about your feedback.

*Regarding #1*: I think the main problem is whether the table environment
has the ability to update itself. Let's take a simple program as an example.


```
TableEnvironment tEnv = TableEnvironment.create(...);

tEnv.getConfig.getConfiguration.setString("table.planner", "old");


tEnv.executeSql("...");

```

If we regard this option as a table option, users don't have to create
another table environment manually. In that case, tEnv needs to check
whether the current mode and planner are the same as before when executeSql
or explainSql. I don't think it's easy work for the table environment,
especially if users have a StreamExecutionEnvironment but set old planner
and batch mode. But when we make this option as a sql client option, users
only use the SET command to change the setting. We can rebuild a new table
environment when set successes.


*Regarding #2*: I think we need to discuss the implementation before
continuing this topic. In the sql client, we will maintain two parsers. The
first parser(client parser) will only match the sql client commands. If the
client parser can't parse the statement, we will leverage the power of the
table environment to execute. According to our blueprint,
TableEnvironment#executeSql is enough for the sql client. Therefore,
TableEnvironment#executeMultiSql is out-of-scope for this FLIP.

But if we need to introduce the `TableEnvironment.executeMultiSql` in the
future, I think it's OK to use the option `table.multi-sql-async` rather
than option `sql-client.job.detach`. But we think the name is not suitable
because the name is confusing for others. When setting the option false, we
just mean it will block the execution of the INSERT INTO statement, not DDL
or others(other sql statements are always executed synchronously). So how
about `table.job.async`? It only works for the sql-client and the
executeMultiSql. If we set this value false, the table environment will
return the result until the job finishes.


*Regarding #3, #4*: I still think we should use DELETE JAR and LIST JAR
because HIVE also uses these commands to add the jar into the classpath or
delete the jar. If we use  such commands, it can reduce our work for hive
compatibility.

For SHOW JAR, I think the main concern is the jars are not maintained by
the Catalog. If we really needs to keep consistent with SQL grammar, maybe
we should use

`ADD JAR` -> `CREATE JAR`,
`DELETE JAR` -> `DROP JAR`,
`LIST JAR` -> `SHOW JAR`.

*Regarding #5*: I agree with you that we'd better keep consistent.

*Regarding #6*: Yes. Most of the commands should belong to the table
environment. In the Summary section, I use the  tag to identify which
commands should belong to the sql client and which commands should belong
to the table environment. I also add a new section about implementation
details in the FLIP.

Best,
Shengkai

Timo Walther  于2021年2月2日周二 下午6:43写道:

> Thanks for this great proposal Shengkai. This will give the SQL Client a
> very good update and make it production ready.
>
> Here is some feedback from my side:
>
> 1) SQL client specific options
>
> I don't think that `sql-client.planner` and `sql-client.execution.mode`
> are SQL Client specific. Similar to `StreamExecutionEnvironment` and
> `ExecutionConfig#configure` that have been added recently, we should
> offer a possibility for TableEnvironment. How about we offer
> `TableEnvironment.create(ReadableConfig)` and add a `table.planner` and
> `table.execution-mode` to
> `org.apache.flink.table.api.config.TableConfigOptions`?
>
> 2) Execution file
>
> Did you have a look at the Appendix of FLIP-84 [1] including the mailing
> list thread at that time? Could you further elaborate how the
> multi-statement execution should work for a unified batch/streaming
> story? According to our past discussions, each line in an execution file
> should be executed blocking which means a streaming query needs a
> statement set to execute multiple INSERT INTO statement, correct? We
> should also offer this functionality in
> `TableEnvironment.executeMultiSql()`. Whether `sql-client.job.detach` is
> SQL Client specific needs to be determined, it could also be a general
> `table.multi-sql-async` option?
>
> 3) DELETE JAR
>
> Shouldn't the opposite of "ADD" be "REMOVE"? "DELETE" sounds like one is
> actively deleting the JAR in the corresponding path.
>
> 4) LIST JAR
>
> This should be `SHOW JARS` according to other SQL commands such as `SHOW
> CATALOGS`, `SHOW TABLES`, etc. [2].
>
> 5) EXPLAIN [ExplainDetail[, ExplainDetail]*]
>
> We should keep the details in sync with
> `org.apache.flink.table.api.ExplainDetail` and avoid confusion about
> differently named ExplainDetails. I would vote for `ESTIMATED_COST`
> instead of `COST`. I'm sure the original author had a reason why to call
> it that way.
>
> 6) Implementation details
>
> It would be nice to understand how we plan to implement the given
> 

[jira] [Created] (FLINK-21246) Decline Checkpoint if some tasks finished before get triggered

2021-02-02 Thread Yun Gao (Jira)
Yun Gao created FLINK-21246:
---

 Summary: Decline Checkpoint if some tasks finished before get 
triggered
 Key: FLINK-21246
 URL: https://issues.apache.org/jira/browse/FLINK-21246
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Runtime / Checkpointing
Reporter: Yun Gao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-158: Generalized incremental checkpoints

2021-02-02 Thread Piotr Nowojski
I'm carrying over my +1 from the discussion thread.

Piotrek

śr., 3 lut 2021 o 05:55 Yuan Mei  napisał(a):

> As aforementioned in the discussion thread, +1 on this Flip vote.
>
> On Wed, Feb 3, 2021 at 6:36 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-158 [1] which was discussed in [2].
> >
> > The vote will be open for at least 72 hours. Unless there are any
> > objections,
> > I'll close it by February 5, 2021 if we have received sufficient votes.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
> >
> > [2]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-158-Generalized-incremental-checkpoints-td47902.html
> >
> >
> >
> > Regards,
> > Roman
> >
>


[jira] [Created] (FLINK-21245) Support StreamExecCalc json serialization/deserialization

2021-02-02 Thread godfrey he (Jira)
godfrey he created FLINK-21245:
--

 Summary: Support StreamExecCalc json serialization/deserialization
 Key: FLINK-21245
 URL: https://issues.apache.org/jira/browse/FLINK-21245
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-02 Thread Xintong Song
Thanks for your feedback, Kezhu.

I think Flink *runtime* already has an ideal granularity for resource
> management 'task'. If there is
> a slot shared by multiple tasks, that slot's resource requirement is simple
> sum of all its logical
> slots. So basically, this is no resource requirement for SlotSharingGroup
> in runtime until now,
> right ?

That is a halfly-cooked implementation, coming from the previous attempts
(years ago) trying to deliver the fine-grained resource management feature,
and never really put into use.

>From the FLIP and dicusssion, I assume that SSG resource specifying will
> override operator level
> resource specifying if both are specified ?
>
Actually, I think we should use the finer-grained resources (i.e. operator
level) if both are specified. And more importantly, that is based on the
assumption that we do need two different levels of interfaces.

So, I wonder whether we could interpret SSG resource specifying as an "add"
> but not an "set" on
> resource requirement ?
>
IIUC, this is the core idea behind your proposal. I think it provides an
interesting idea of how we combine operator level and SSG level resources, *if
we allow configuring resources at both levels*. However, I'm not sure
whether the configuring resources on the operator level is indeed needed.
Therefore, as a first step, this FLIP proposes to only introduce the
SSG-level interfaces. As listed in the future plan, we would consider
allowing operator level resource configuration later if we do see a need
for it. At that time, we definitely should discuss what to do if resources
are configured at both levels.

* Could SSG express negative resource requirement ?
>
No.

Is there concrete bar for partial resource configured not function ? I
> saw it will fail job submission in Dispatcher.submitJob.
>
With the SSG-based approach, this should no longer be needed. The
constraint was introduced because we can neither properly define what is
the resource of a task chained from an operator with specified resource and
another with unspecified resource, nor for a slot shared by a task with
specified resource and another with unspecified resource. With the
SSG-based approach, we no longer have those problems.

An option(cluster/job level) to force slot sharing in scheduler ? This
> could be useful in case of migration from FLIP-156 to future approach.
>
I think this is exactly what we are trying to avoid, requiring the
scheduler to enforce slot sharing.

An option(cluster) to ignore resource specifying(allow resource specified
> job to run on open box environment) for no production usage ?
>
That's possible. Actually, we are planning to introduce an option for
activating the fine-grained resource management, for development purposes.
We might consider to keep that option after the feature is completed, to
allow disable the feature without having to touch the job codes.

Thank you~

Xintong Song



On Wed, Feb 3, 2021 at 1:28 PM Kezhu Wang  wrote:

> Hi all, sorry for join discussion even after voting started.
>
> I want to share my thoughts on this after reading above discussions.
>
> I think Flink *runtime* already has an ideal granularity for resource
> management 'task'. If there is
> a slot shared by multiple tasks, that slot's resource requirement is simple
> sum of all its logical
> slots. So basically, this is no resource requirement for SlotSharingGroup
> in runtime until now,
> right ?
>
> As in discussion, we already agree upon that: "If all operators have their
> resources properly
> specified, then slot sharing is no longer needed. "
>
> So seems to me, naturally in mind path, what we would discuss is that: how
> to bridge impractical
> operator level resource specifying to runtime task level resource
> requirement ? This is actually a
> pure api thing as Chesnay has pointed out.
>
> But FLIP-156 brings another direction on table: how about using SSG for
> both api and runtime
> resource specifying ?
>
> From the FLIP and dicusssion, I assume that SSG resource specifying will
> override operator level
> resource specifying if both are specified ?
>
> So, I wonder whether we could interpret SSG resource specifying as an "add"
> but not an "set" on
> resource requirement ?
>
> The semantics is that SSG resource specifying adds additional resource to
> shared slot to express
> concerns on possible high thoughput and resource requirement for tasks in
> one physical slot.
>
> The result is that if scheduler indeed respect slot sharing, allocated slot
> will gain extra resource
> specified for that SSG.
>
> I think one of coding barrier from "add" approach is ResourceSpec.UNKNOWN
> which didn't support
> 'merge' operation. I tend to use ResourceSpec.ZERO as default, task
> executor should be aware of
> this.
>
> @Chesnay
> > My main worry is that it if we wire the runtime to work on SSGs it's
> > gonna be difficult to implement more fine-grained approaches, which
> > would not be the case if, for the runtime, they 

Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-02 Thread Yangze Guo
Hi, Kezhu.

Thanks for your feedback.

> Flink *runtime* already has an ideal granularity for resource management 
> 'task'.
As mentioned in FLIP, there are some ancient codes in Flink code base,
but these codes are never really used and exposed to user. So, there
is actually no operator or SSG level resource requirements, but the
slot is already the basic unit for resource management in Flink’s
runtime.

> that SSG resource specifying will override operator level resource specifying 
> if both are specified
We now treat the operator level resource specifying as a potential
follow up for the fine-grained resource management. We need to collect
more feedbacks to decide whether we really need it. Regarding whether
and how to allow hybrid (SSG + OP) configuration, I think there might
be no point in discussing it at present.

UUIC, your proposal based on the assumption that we already have the
operator level resource configuration and target to solve how to
determine the slot resource spec when both configurations exist.
- First, we do not ensure that we need operator-level resource
configuration atm.
- Second, we do even not sure whether we need to support hybrid configuration.

So, as written in the future plan, I tend to first collect feedbacks
on the operator-level resource configuration interface when the
fine-grained resource management is ready. Then, we consider further
optimization, such as your proposal.


Best,
Yangze Guo

On Wed, Feb 3, 2021 at 1:28 PM Kezhu Wang  wrote:
>
> Hi all, sorry for join discussion even after voting started.
>
> I want to share my thoughts on this after reading above discussions.
>
> I think Flink *runtime* already has an ideal granularity for resource
> management 'task'. If there is
> a slot shared by multiple tasks, that slot's resource requirement is simple
> sum of all its logical
> slots. So basically, this is no resource requirement for SlotSharingGroup
> in runtime until now,
> right ?
>
> As in discussion, we already agree upon that: "If all operators have their
> resources properly
> specified, then slot sharing is no longer needed. "
>
> So seems to me, naturally in mind path, what we would discuss is that: how
> to bridge impractical
> operator level resource specifying to runtime task level resource
> requirement ? This is actually a
> pure api thing as Chesnay has pointed out.
>
> But FLIP-156 brings another direction on table: how about using SSG for
> both api and runtime
> resource specifying ?
>
> From the FLIP and dicusssion, I assume that SSG resource specifying will
> override operator level
> resource specifying if both are specified ?
>
> So, I wonder whether we could interpret SSG resource specifying as an "add"
> but not an "set" on
> resource requirement ?
>
> The semantics is that SSG resource specifying adds additional resource to
> shared slot to express
> concerns on possible high thoughput and resource requirement for tasks in
> one physical slot.
>
> The result is that if scheduler indeed respect slot sharing, allocated slot
> will gain extra resource
> specified for that SSG.
>
> I think one of coding barrier from "add" approach is ResourceSpec.UNKNOWN
> which didn't support
> 'merge' operation. I tend to use ResourceSpec.ZERO as default, task
> executor should be aware of
> this.
>
> @Chesnay
> > My main worry is that it if we wire the runtime to work on SSGs it's
> > gonna be difficult to implement more fine-grained approaches, which
> > would not be the case if, for the runtime, they are always defined on an
> > operator-level.
>
> An "add" operation should be less invasive and enforce low barrier for
> future find-grained
> approaches.
>
> @Stephan
> >   - Users can define different slot sharing groups for operators like
> they
> > do now, with the exception that you cannot mix operators that have a
> > resource profile and operators that have no resource profile.
>
> @Till
> > This effectively means that all unspecified operators
> > will implicitly have a zero resource requirement.
> > I am wondering whether this wouldn't lead to a surprising behaviour for
> the
> > user. If the user specifies the resource requirements for a single
> > operator, then he probably will assume that the other operators will get
> > the default share of resources and not nothing.
>
> I think it is inherent due to fact that we could not defining
> ResourceSpec.ONE, eg. resource
> requirement for exact one default slot, with concrete numbers ? I tend to
> squash out unspecified one
> if there are operators in chaining with explicit resource specifying.
> Otherwise, the protocol tends
> to verbose as say "give me this much resource and a default". I think if we
> have explict resource
> specifying for partial operators, it is just saying "I don't care other
> operators that much, just
> get them places to run". It is most likely be cases there are stateless
> fliter/map or other less
> resource consuming operators. If there is indeed a problem, I think 

[jira] [Created] (FLINK-21244) shouldPerformUnalignedCheckpointOnLocalAndRemoteChannel Fail

2021-02-02 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21244:
-

 Summary: shouldPerformUnalignedCheckpointOnLocalAndRemoteChannel 
Fail
 Key: FLINK-21244
 URL: https://issues.apache.org/jira/browse/FLINK-21244
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Guowei Ma


 
 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12814=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=981eced9-6683-5752-3201-62faf56c149b
 
 
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:435)
 
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
 
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
 
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
 at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
 at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
 at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
 ... 4 more 
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
tolerable failure threshold. 
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:90)
 
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1889)
 
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:996)
 
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$6(SchedulerBase.java:1026)
 
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
 at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 
 at java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-02 Thread Jark Wu
Hi Jane,

Yes. I think we should fail fast.

Best,
Jark

On Wed, 3 Feb 2021 at 12:06, Jane Chan  wrote:

> Hi everyone,
>
> Thanks for the discussion to make this improvement plan clearer.
>
> Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion summaries
> now and want to confirm one thing that for the statement `USE MODULES x [,
> y, z, ...]`, if the module name list contains an unexsited module, shall we
> #1 fail the execution for all of them or #2 enabled the rest modules and
> return a warning to users? My personal preference goes to #1 for
> simplicity. What do you think?
>
> Best,
> Jane
>
> On Tue, Feb 2, 2021 at 3:53 PM Timo Walther  wrote:
>
> > +1
> >
> > @Jane Can you summarize our discussion in the JIRA issue?
> >
> > Thanks,
> > Timo
> >
> >
> > On 02.02.21 03:50, Jark Wu wrote:
> > > Hi Timo,
> > >
> > >> Another question is whether a LOAD operation also adds the module to
> the
> > > enabled list by default?
> > >
> > > I would like to add the module to the enabled list by default, the main
> > > reasons are:
> > > 1) Reordering is an advanced requirement, adding modules needs
> additional
> > > USE statements with "core" module
> > >   sounds too burdensome. Most users should be satisfied with only LOAD
> > > statements.
> > > 2) We should keep compatible for TableEnvironment#loadModule().
> > > 3) We are using the LOAD statement instead of CREATE, so I think it's
> > fine
> > > that it does some implicit things.
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 2 Feb 2021 at 00:48, Timo Walther  wrote:
> > >
> > >> Not the module itself but the ModuleManager should handle this case,
> > yes.
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 01.02.21 17:35, Jane Chan wrote:
> > >>> +1 to Jark's proposal
> > >>>
> > >>>To make it clearer,  will `module#getFunctionDefinition()` return
> > empty
> > >>> suppose the module is loaded but not enabled?
> > >>>
> > >>> Best,
> > >>> Jane
> > >>>
> > >>> On Mon, Feb 1, 2021 at 10:02 PM Timo Walther 
> > wrote:
> > >>>
> >  +1 to Jark's proposal
> > 
> >  I like the difference between just loading and actually enabling
> these
> >  modules.
> > 
> >  @Rui: I would use the same behavior as catalogs here. You cannot
> > `USE` a
> >  catalog without creating it before.
> > 
> >  Another question is whether a LOAD operation also adds the module to
> > the
> >  enabled list by default?
> > 
> >  Regards,
> >  Timo
> > 
> >  On 01.02.21 13:52, Rui Li wrote:
> > > If `USE MODULES` implies unloading modules that are not listed,
> does
> > it
> > > also imply loading modules that are not previously loaded,
> especially
> >  since
> > > we're mapping modules by name now?
> > >
> > > On Mon, Feb 1, 2021 at 8:20 PM Jark Wu  wrote:
> > >
> > >> I agree with Timo that the USE implies the specified modules are
> in
> > >> use
> >  in
> > >> the specified order and others are not used.
> > >> This would be easier to know what's the result list and order
> after
> > >> the
> >  USE
> > >> statement.
> > >> That means: if current modules in order are x, y, z. And `USE
> > MODULES
> >  z, y`
> > >> means current modules in order are z, y.
> > >>
> > >> But I would like to not unload the unmentioned modules in the USE
> > >> statement. Because it seems strange that USE
> > >> will implicitly remove modules. In the above example, the user may
> > >> type
> >  the
> > >> wrong modules list using USE by mistake
> > >> and would like to declare the list again, the user has to
> create
> > >> the
> > >> module again with some properties he may don't know. Therefore, I
> >  propose
> > >> the USE statement just specifies the current module lists and
> > doesn't
> > >> unload modules.
> > >> Besides that, we may need a new syntax to list all the modules
> > >> including
> > >> not used but loaded.
> > >> We can introduce SHOW FULL MODULES for this purpose with an
> > additional
> > >> `used` column.
> > >>
> > >> For example:
> > >>
> > >> Flink SQL> list modules:
> > >> ---
> > >> | modules |
> > >> ---
> > >> | x   |
> > >> | y   |
> > >> | z   |
> > >> ---
> > >> Flink SQL> USE MODULES z, y;
> > >> Flink SQL> show modules:
> > >> ---
> > >> | modules |
> > >> ---
> > >> | z   |
> > >> | y   |
> > >> ---
> > >> Flink SQL> show FULL modules;
> > >> ---
> > >> | modules |  used |
> > >> ---
> > >> | z   | true  |
> > >> | y   | true  |
> > >> | x   | false |
> > >> ---
> > >> Flink SQL> USE MODULES z, y, x;
> > >> Flink SQL> show modules;
> > >> ---
> > >> | modules |
> > >> ---
> > >> | z   

Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-02 Thread Kezhu Wang
Hi all, sorry for join discussion even after voting started.

I want to share my thoughts on this after reading above discussions.

I think Flink *runtime* already has an ideal granularity for resource
management 'task'. If there is
a slot shared by multiple tasks, that slot's resource requirement is simple
sum of all its logical
slots. So basically, this is no resource requirement for SlotSharingGroup
in runtime until now,
right ?

As in discussion, we already agree upon that: "If all operators have their
resources properly
specified, then slot sharing is no longer needed. "

So seems to me, naturally in mind path, what we would discuss is that: how
to bridge impractical
operator level resource specifying to runtime task level resource
requirement ? This is actually a
pure api thing as Chesnay has pointed out.

But FLIP-156 brings another direction on table: how about using SSG for
both api and runtime
resource specifying ?

>From the FLIP and dicusssion, I assume that SSG resource specifying will
override operator level
resource specifying if both are specified ?

So, I wonder whether we could interpret SSG resource specifying as an "add"
but not an "set" on
resource requirement ?

The semantics is that SSG resource specifying adds additional resource to
shared slot to express
concerns on possible high thoughput and resource requirement for tasks in
one physical slot.

The result is that if scheduler indeed respect slot sharing, allocated slot
will gain extra resource
specified for that SSG.

I think one of coding barrier from "add" approach is ResourceSpec.UNKNOWN
which didn't support
'merge' operation. I tend to use ResourceSpec.ZERO as default, task
executor should be aware of
this.

@Chesnay
> My main worry is that it if we wire the runtime to work on SSGs it's
> gonna be difficult to implement more fine-grained approaches, which
> would not be the case if, for the runtime, they are always defined on an
> operator-level.

An "add" operation should be less invasive and enforce low barrier for
future find-grained
approaches.

@Stephan
>   - Users can define different slot sharing groups for operators like
they
> do now, with the exception that you cannot mix operators that have a
> resource profile and operators that have no resource profile.

@Till
> This effectively means that all unspecified operators
> will implicitly have a zero resource requirement.
> I am wondering whether this wouldn't lead to a surprising behaviour for
the
> user. If the user specifies the resource requirements for a single
> operator, then he probably will assume that the other operators will get
> the default share of resources and not nothing.

I think it is inherent due to fact that we could not defining
ResourceSpec.ONE, eg. resource
requirement for exact one default slot, with concrete numbers ? I tend to
squash out unspecified one
if there are operators in chaining with explicit resource specifying.
Otherwise, the protocol tends
to verbose as say "give me this much resource and a default". I think if we
have explict resource
specifying for partial operators, it is just saying "I don't care other
operators that much, just
get them places to run". It is most likely be cases there are stateless
fliter/map or other less
resource consuming operators. If there is indeed a problem, I think clients
can specify a global
default(or other level default in future). In job graph generating phase,
we could take that default
into account for unspecified operators.

@FLIP-156
> Expose operator chaining. (Cons fo task level resource specifying)

Is it inherent for all group level resource specifying ? They will either
break chaining or obey it,
or event could not work with.

To sum up above, my suggestions are:

In api side:
* StreamExecutionEnvironment: A global default(ResourceSpec.ZERO if
unspecified).
* Operator: ResourceSpec.ZERO(unspecified) as default.
* Task: sum of requirements from specified operators + global default(if
there are any unspecified operators)
* SSG: additional resource to physical slot.

In runtime side:
* Task: ResourceSpec.Task or ResourceSpec.ZERO
* SSG: ResourceSpec.SSG or ResourceSpec.ZERO

Physical slot gets sum up resources from logical slots and SSG, if it gets
ResourceSpec.ZERO, it is
just a default sized slot.

In short, turn SSG resource speciying as "add" and drop
ResourceSpec.UNKNOWN.


Questions/Issues:
* Could SSG express negative resource requirement ?
* Is there concrete bar for partial resource configured not function ? I
saw it will fail job submission in Dispatcher.submitJob.
* An option(cluster/job level) to force slot sharing in scheduler ? This
could be useful in case of migration from FLIP-156 to future approach.
* An option(cluster) to ignore resource specifying(allow resource specified
job to run on open box environment) for no production usage ?



On February 1, 2021 at 11:54:10, Yangze Guo (karma...@gmail.com) wrote:

Thanks for reply, Till and Xintong!

I update the FLIP, 

Re: [VOTE] FLIP-158: Generalized incremental checkpoints

2021-02-02 Thread Yuan Mei
As aforementioned in the discussion thread, +1 on this Flip vote.

On Wed, Feb 3, 2021 at 6:36 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi everyone,
>
> I'd like to start a vote on FLIP-158 [1] which was discussed in [2].
>
> The vote will be open for at least 72 hours. Unless there are any
> objections,
> I'll close it by February 5, 2021 if we have received sufficient votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
>
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-158-Generalized-incremental-checkpoints-td47902.html
>
>
>
> Regards,
> Roman
>


Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-02 Thread Jane Chan
Hi everyone,

Thanks for the discussion to make this improvement plan clearer.

Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion summaries
now and want to confirm one thing that for the statement `USE MODULES x [,
y, z, ...]`, if the module name list contains an unexsited module, shall we
#1 fail the execution for all of them or #2 enabled the rest modules and
return a warning to users? My personal preference goes to #1 for
simplicity. What do you think?

Best,
Jane

On Tue, Feb 2, 2021 at 3:53 PM Timo Walther  wrote:

> +1
>
> @Jane Can you summarize our discussion in the JIRA issue?
>
> Thanks,
> Timo
>
>
> On 02.02.21 03:50, Jark Wu wrote:
> > Hi Timo,
> >
> >> Another question is whether a LOAD operation also adds the module to the
> > enabled list by default?
> >
> > I would like to add the module to the enabled list by default, the main
> > reasons are:
> > 1) Reordering is an advanced requirement, adding modules needs additional
> > USE statements with "core" module
> >   sounds too burdensome. Most users should be satisfied with only LOAD
> > statements.
> > 2) We should keep compatible for TableEnvironment#loadModule().
> > 3) We are using the LOAD statement instead of CREATE, so I think it's
> fine
> > that it does some implicit things.
> >
> > Best,
> > Jark
> >
> > On Tue, 2 Feb 2021 at 00:48, Timo Walther  wrote:
> >
> >> Not the module itself but the ModuleManager should handle this case,
> yes.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 01.02.21 17:35, Jane Chan wrote:
> >>> +1 to Jark's proposal
> >>>
> >>>To make it clearer,  will `module#getFunctionDefinition()` return
> empty
> >>> suppose the module is loaded but not enabled?
> >>>
> >>> Best,
> >>> Jane
> >>>
> >>> On Mon, Feb 1, 2021 at 10:02 PM Timo Walther 
> wrote:
> >>>
>  +1 to Jark's proposal
> 
>  I like the difference between just loading and actually enabling these
>  modules.
> 
>  @Rui: I would use the same behavior as catalogs here. You cannot
> `USE` a
>  catalog without creating it before.
> 
>  Another question is whether a LOAD operation also adds the module to
> the
>  enabled list by default?
> 
>  Regards,
>  Timo
> 
>  On 01.02.21 13:52, Rui Li wrote:
> > If `USE MODULES` implies unloading modules that are not listed, does
> it
> > also imply loading modules that are not previously loaded, especially
>  since
> > we're mapping modules by name now?
> >
> > On Mon, Feb 1, 2021 at 8:20 PM Jark Wu  wrote:
> >
> >> I agree with Timo that the USE implies the specified modules are in
> >> use
>  in
> >> the specified order and others are not used.
> >> This would be easier to know what's the result list and order after
> >> the
>  USE
> >> statement.
> >> That means: if current modules in order are x, y, z. And `USE
> MODULES
>  z, y`
> >> means current modules in order are z, y.
> >>
> >> But I would like to not unload the unmentioned modules in the USE
> >> statement. Because it seems strange that USE
> >> will implicitly remove modules. In the above example, the user may
> >> type
>  the
> >> wrong modules list using USE by mistake
> >> and would like to declare the list again, the user has to create
> >> the
> >> module again with some properties he may don't know. Therefore, I
>  propose
> >> the USE statement just specifies the current module lists and
> doesn't
> >> unload modules.
> >> Besides that, we may need a new syntax to list all the modules
> >> including
> >> not used but loaded.
> >> We can introduce SHOW FULL MODULES for this purpose with an
> additional
> >> `used` column.
> >>
> >> For example:
> >>
> >> Flink SQL> list modules:
> >> ---
> >> | modules |
> >> ---
> >> | x   |
> >> | y   |
> >> | z   |
> >> ---
> >> Flink SQL> USE MODULES z, y;
> >> Flink SQL> show modules:
> >> ---
> >> | modules |
> >> ---
> >> | z   |
> >> | y   |
> >> ---
> >> Flink SQL> show FULL modules;
> >> ---
> >> | modules |  used |
> >> ---
> >> | z   | true  |
> >> | y   | true  |
> >> | x   | false |
> >> ---
> >> Flink SQL> USE MODULES z, y, x;
> >> Flink SQL> show modules;
> >> ---
> >> | modules |
> >> ---
> >> | z   |
> >> | y   |
> >> | x   |
> >> ---
> >>
> >> What do you think?
> >>
> >> Best,
> >> Jark
> >>
> >> On Mon, 1 Feb 2021 at 19:02, Jane Chan 
> wrote:
> >>
> >>> Hi Timo, thanks for the discussion.
> >>>
> >>> It seems to reach an agreement regarding #3 that <1> Module name
> >> should
> >>> better be a simple identifier rather than a string literal. 

Re: [VOTE] FLIP-160: Declarative scheduler

2021-02-02 Thread Yu Li
+1 (binding)

Best Regards,
Yu


On Mon, 1 Feb 2021 at 17:21, Matthias Pohl  wrote:

> +1 (non-binding)
>
> Thanks,
> Matthias
>
> On Mon, Feb 1, 2021 at 4:22 AM Zhu Zhu  wrote:
>
> > +1 (binding)
> >
> > Thanks,
> > Zhu
> >
> > Yang Wang  于2021年2月1日周一 上午11:04写道:
> >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Yang
> > >
> > > Yangze Guo  于2021年2月1日周一 上午9:50写道:
> > >
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Sat, Jan 30, 2021 at 8:40 AM Xintong Song 
> > > > wrote:
> > > > >
> > > > > +1 (binding)
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Jan 29, 2021 at 10:41 PM Robert Metzger <
> rmetz...@apache.org
> > >
> > > > wrote:
> > > > >
> > > > > > ... and thanks a lot for your work :) I'm really excited about
> > > finally
> > > > > > adding this feature to Flink!
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 29, 2021 at 3:40 PM Robert Metzger <
> > rmetz...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 (binding)
> > > > > > >
> > > > > > > On Fri, Jan 29, 2021 at 3:23 PM Till Rohrmann <
> > > trohrm...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> since the discussion [1] about FLIP-160 [2] seems to have
> > reached
> > > a
> > > > > > >> consensus, I'd like to start a formal vote for the FLIP.
> > > > > > >>
> > > > > > >> Please vote +1 to approve the FLIP, or -1 with a comment. The
> > vote
> > > > will
> > > > > > be
> > > > > > >> open at least until Wednesday, Feb 3rd.
> > > > > > >>
> > > > > > >> Cheers,
> > > > > > >> Till
> > > > > > >>
> > > > > > >> [1]
> > > > > > >>
> > > > > > >>
> > > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > >> [2]
> > > > > > >>
> > > > > > >>
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Declarative+Scheduler
> > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
>


Re: [VOTE] FLIP-159: Reactive Mode

2021-02-02 Thread Yu Li
+1 (binding)

Thanks for the efforts Robert!

Best Regards,
Yu


On Mon, 1 Feb 2021 at 17:19, Matthias Pohl  wrote:

> Thanks Robert and congratulations on your first FLIP.
> +1 (non-binding)
>
> Matthias
>
> On Mon, Feb 1, 2021 at 4:22 AM Zhu Zhu  wrote:
>
> > +1 (binding)
> >
> > Thanks,
> > Zhu
> >
> > Till Rohrmann  于2021年1月29日周五 下午10:23写道:
> >
> > > LGTM. Thanks for the work Robert!
> > >
> > > +1 (binding)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Jan 28, 2021 at 11:27 AM Yang Wang 
> > wrote:
> > >
> > > > Thanks Robert for your great work on this FLIP. This is really a big
> > step
> > > > to make Flink auto scalable.
> > > >
> > > > +1 (non-binding)
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Robert Metzger  于2021年1月28日周四 下午4:32写道:
> > > >
> > > > > @Yangze: That's something I overlooked. I should have waited. If
> > > FLIP-160
> > > > > is rejected or undergoes fundamental changes, I'll cancel this vote
> > and
> > > > > rewrite FLIP-159.
> > > > > But I have the impression that there were no major concerns
> regarding
> > > > > FLIP-160 so far.
> > > > >
> > > > > On Thu, Jan 28, 2021 at 8:46 AM Yangze Guo 
> > wrote:
> > > > >
> > > > > > Thanks for driving this, Robert! LGTM.
> > > > > >
> > > > > > +1
> > > > > >
> > > > > > minor: Just a little confused about the program. It seems this
> > > > > > proposal relies on the FLIP-160, which is still under discussion.
> > > > > > Should we always vote for the prerequisite first?
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > >
> > > > > > On Thu, Jan 28, 2021 at 3:27 PM Xintong Song <
> > tonysong...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Thanks Robert. LGTM.
> > > > > > >
> > > > > > > +1 (binding)
> > > > > > >
> > > > > > > Thank you~
> > > > > > >
> > > > > > > Xintong Song
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jan 28, 2021 at 2:50 PM Robert Metzger <
> > > rmetz...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > since the discussion [1] about FLIP-159 [2] seems to have
> > > reached a
> > > > > > > > consensus, I'd like to start a formal vote for the FLIP.
> > > > > > > >
> > > > > > > > Please vote +1 to approve the FLIP, or -1 with a comment. The
> > > vote
> > > > > > will be
> > > > > > > > open at least until Tuesday, Feb 2nd.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Robert
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E
> > > > > > > > [2]
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>


Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2021-02-02 Thread Jark Wu
Thanks Rui for the great proposal, I believe this can be very attractive
for many Hive users.

The FLIP looks good to me in general, I only have some minor comments:

1) BlinkParserFactory
IIUC, BlinkParserFactory is located in the flink-table-api-java module with
the Parser interface there.
I suggest renaming it to `ParserFactory`, because it creates Parser instead
of BlinkParser.
And the implementations can be `HiveParserFactory` and
`FlinkParserFactory`.
I think we should avoid the `Blink` keyword in interfaces, blink planner is
already the default planner and
the old planner will be removed in the near future. There will be no
`blink` in the future then.

2) "create a new instance each time getParser is called"
Finding parser for every time getParser is called sounds heavy to me. I
think we can improve this by simplify
caching the Parser instance,  and creating a new one if current sql-dialect
is different from the cached Parser.

3) Hive version
How much code needs to be done to support new features in 3.x based on 2.x?
Is this also included in this FLIP/release?
I don't fully understand this because the FLIP says "we can use a newer
version to support older versions."

Best,
Jark

On Wed, 3 Feb 2021 at 11:48, godfrey he  wrote:

> Thanks for bringing up the discussion, Rui!
>
> Regarding the DDL part in the "Introduce HiveParser" section,
> I would like to choose the second option. Because if we could
> use one hive parser to parse all hive SQLs, we need not to copy
> Calcite parser code, and the framework and the code will be very simple.
>
> Regarding the "Go Beyond Hive" section, is that the scope of this FLIP ?
> Could you list all the extensions and give some examples ?
>
> One minor suggestion about the name of ParserImplFactory.
> How about renaming ParserImplFactory to DefaultParserFactory ?
>
> Best,
> Godfrey
>
> Rui Li  于2021年2月3日周三 上午11:16写道:
>
> > Hi Jingsong,
> >
> > Thanks for your comments and they're very good questions.
> >
> > Regarding # Version, we need to do some tradeoff here. Choosing the
> latest
> > 3.x will cover all the features we want to support. But as you said, 3.x
> > and 2.x can have some differences and requires more efforts to support
> > lower versions. I decided to pick 2.x and evolve from there to support
> new
> > features in 3.x. Because I think most hive users, especially those who
> are
> > likely to be interested in this feature, are still using 2.x or even 1.x.
> > So the priority is to cover 2.x and 1.x first.
> >
> > Regarding # Hive Codes, in my PoC, I simply copy the code and make as few
> > changes as possible. I believe we can do some clean up or refactor to
> > reduce it. With that in mind, I expect it to be over 10k lines of java
> > code, and even more if we count ANTLR grammar files as well.
> >
> > Regarding # Functions, you're right that HiveModule is more of a solution
> > than limitation. I just want to emphasize that HiveModule and hive
> dialect
> > need to be used together to achieve better compatibility.
> >
> > Regarding # Keywords, new hive versions can have more reserved keywords
> > than old versions. Since we're based on hive 2.x code, it may not provide
> > 100% keyword-compatibility to 1.x users. But I expect it to be good
> enough
> > for most cases. If not, we can provide different grammar files for lower
> > versions.
> >
> > On Tue, Feb 2, 2021 at 5:10 PM Jingsong Li 
> wrote:
> >
> > > Thanks Rui for the proposal, I think this FLIP is required by many
> users,
> > > and it is very good to traditional Hive users. I have some confusion:
> > >
> > > # Version
> > >
> > > Which Hive version do you want to choose? Maybe, Hive 3.X and Hive 2.X
> > have
> > > some differences?
> > >
> > > # Hive Codes
> > >
> > > Can you evaluate how much code we need to copy to our
> > flink-hive-connector?
> > > Do we need to change them? We need to maintain them anyway.
> > >
> > > # Functions
> > >
> > > About Hive functions, I don't think it is a limitation, we are using
> > > HiveModule to be compatible with Hive, right? So it is a solution
> instead
> > > of a limitation.
> > >
> > > # Keywords
> > >
> > > Do you think there will be a keyword problem? Or can we be 100%
> > compatible
> > > with Hive?
> > >
> > > On the whole, the FLIP looks very good and I'm looking forward to it.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Fri, Dec 11, 2020 at 11:35 AM Zhijiang
> > >  wrote:
> > >
> > > > Thanks for the further info and explanations! I have no other
> concerns.
> > > >
> > > > Best,
> > > > Zhijiang
> > > >
> > > >
> > > > --
> > > > From:Rui Li 
> > > > Send Time:2020年12月10日(星期四) 20:35
> > > > To:dev ; Zhijiang 
> > > > Subject:Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility
> > > >
> > > > Hi Zhijiang,
> > > >
> > > > Glad to know you're interested in this FLIP. I wouldn't claim 100%
> > > > compatibility with this FLIP. That's because Flink doesn't have the
> > > 

Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2021-02-02 Thread godfrey he
Thanks for bringing up the discussion, Rui!

Regarding the DDL part in the "Introduce HiveParser" section,
I would like to choose the second option. Because if we could
use one hive parser to parse all hive SQLs, we need not to copy
Calcite parser code, and the framework and the code will be very simple.

Regarding the "Go Beyond Hive" section, is that the scope of this FLIP ?
Could you list all the extensions and give some examples ?

One minor suggestion about the name of ParserImplFactory.
How about renaming ParserImplFactory to DefaultParserFactory ?

Best,
Godfrey

Rui Li  于2021年2月3日周三 上午11:16写道:

> Hi Jingsong,
>
> Thanks for your comments and they're very good questions.
>
> Regarding # Version, we need to do some tradeoff here. Choosing the latest
> 3.x will cover all the features we want to support. But as you said, 3.x
> and 2.x can have some differences and requires more efforts to support
> lower versions. I decided to pick 2.x and evolve from there to support new
> features in 3.x. Because I think most hive users, especially those who are
> likely to be interested in this feature, are still using 2.x or even 1.x.
> So the priority is to cover 2.x and 1.x first.
>
> Regarding # Hive Codes, in my PoC, I simply copy the code and make as few
> changes as possible. I believe we can do some clean up or refactor to
> reduce it. With that in mind, I expect it to be over 10k lines of java
> code, and even more if we count ANTLR grammar files as well.
>
> Regarding # Functions, you're right that HiveModule is more of a solution
> than limitation. I just want to emphasize that HiveModule and hive dialect
> need to be used together to achieve better compatibility.
>
> Regarding # Keywords, new hive versions can have more reserved keywords
> than old versions. Since we're based on hive 2.x code, it may not provide
> 100% keyword-compatibility to 1.x users. But I expect it to be good enough
> for most cases. If not, we can provide different grammar files for lower
> versions.
>
> On Tue, Feb 2, 2021 at 5:10 PM Jingsong Li  wrote:
>
> > Thanks Rui for the proposal, I think this FLIP is required by many users,
> > and it is very good to traditional Hive users. I have some confusion:
> >
> > # Version
> >
> > Which Hive version do you want to choose? Maybe, Hive 3.X and Hive 2.X
> have
> > some differences?
> >
> > # Hive Codes
> >
> > Can you evaluate how much code we need to copy to our
> flink-hive-connector?
> > Do we need to change them? We need to maintain them anyway.
> >
> > # Functions
> >
> > About Hive functions, I don't think it is a limitation, we are using
> > HiveModule to be compatible with Hive, right? So it is a solution instead
> > of a limitation.
> >
> > # Keywords
> >
> > Do you think there will be a keyword problem? Or can we be 100%
> compatible
> > with Hive?
> >
> > On the whole, the FLIP looks very good and I'm looking forward to it.
> >
> > Best,
> > Jingsong
> >
> > On Fri, Dec 11, 2020 at 11:35 AM Zhijiang
> >  wrote:
> >
> > > Thanks for the further info and explanations! I have no other concerns.
> > >
> > > Best,
> > > Zhijiang
> > >
> > >
> > > --
> > > From:Rui Li 
> > > Send Time:2020年12月10日(星期四) 20:35
> > > To:dev ; Zhijiang 
> > > Subject:Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility
> > >
> > > Hi Zhijiang,
> > >
> > > Glad to know you're interested in this FLIP. I wouldn't claim 100%
> > > compatibility with this FLIP. That's because Flink doesn't have the
> > > functionalities to support all Hive's features. To list a few examples:
> > >
> > >1. Hive allows users to process data with shell scripts -- very
> > similar
> > >to UDFs [1]
> > >2. Users can compile inline Groovy UDFs and use them in queries [2]
> > >3. Users can dynamically add/delete jars, or even execute arbitrary
> > >shell command [3]
> > >
> > > These features cannot be supported merely by a parser/planner, and it's
> > > open to discussion whether Flink even should support them at all.
> > >
> > > So the ultimate goal of this FLIP is to provide Hive syntax
> compatibility
> > > to features that are already available in Flink, which I believe will
> > cover
> > > most common use cases.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-TRANSFORMExamples
> > > [2]
> > >
> > >
> >
> https://community.cloudera.com/t5/Community-Articles/Apache-Hive-Groovy-UDF-examples/ta-p/245060
> > > [3]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveInteractiveShellCommands
> > >
> > > On Thu, Dec 10, 2020 at 6:11 PM Zhijiang  > > .invalid>
> > > wrote:
> > >
> > > > Thanks for launching the discussion and the FLIP, Rui!
> > > >
> > > > It is really nice to see our continuous efforts for compatibility
> with
> > > > Hive and benefiting users in this area.
> > > > I am only curious that are 

Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2021-02-02 Thread Rui Li
Hi Jingsong,

Thanks for your comments and they're very good questions.

Regarding # Version, we need to do some tradeoff here. Choosing the latest
3.x will cover all the features we want to support. But as you said, 3.x
and 2.x can have some differences and requires more efforts to support
lower versions. I decided to pick 2.x and evolve from there to support new
features in 3.x. Because I think most hive users, especially those who are
likely to be interested in this feature, are still using 2.x or even 1.x.
So the priority is to cover 2.x and 1.x first.

Regarding # Hive Codes, in my PoC, I simply copy the code and make as few
changes as possible. I believe we can do some clean up or refactor to
reduce it. With that in mind, I expect it to be over 10k lines of java
code, and even more if we count ANTLR grammar files as well.

Regarding # Functions, you're right that HiveModule is more of a solution
than limitation. I just want to emphasize that HiveModule and hive dialect
need to be used together to achieve better compatibility.

Regarding # Keywords, new hive versions can have more reserved keywords
than old versions. Since we're based on hive 2.x code, it may not provide
100% keyword-compatibility to 1.x users. But I expect it to be good enough
for most cases. If not, we can provide different grammar files for lower
versions.

On Tue, Feb 2, 2021 at 5:10 PM Jingsong Li  wrote:

> Thanks Rui for the proposal, I think this FLIP is required by many users,
> and it is very good to traditional Hive users. I have some confusion:
>
> # Version
>
> Which Hive version do you want to choose? Maybe, Hive 3.X and Hive 2.X have
> some differences?
>
> # Hive Codes
>
> Can you evaluate how much code we need to copy to our flink-hive-connector?
> Do we need to change them? We need to maintain them anyway.
>
> # Functions
>
> About Hive functions, I don't think it is a limitation, we are using
> HiveModule to be compatible with Hive, right? So it is a solution instead
> of a limitation.
>
> # Keywords
>
> Do you think there will be a keyword problem? Or can we be 100% compatible
> with Hive?
>
> On the whole, the FLIP looks very good and I'm looking forward to it.
>
> Best,
> Jingsong
>
> On Fri, Dec 11, 2020 at 11:35 AM Zhijiang
>  wrote:
>
> > Thanks for the further info and explanations! I have no other concerns.
> >
> > Best,
> > Zhijiang
> >
> >
> > --
> > From:Rui Li 
> > Send Time:2020年12月10日(星期四) 20:35
> > To:dev ; Zhijiang 
> > Subject:Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility
> >
> > Hi Zhijiang,
> >
> > Glad to know you're interested in this FLIP. I wouldn't claim 100%
> > compatibility with this FLIP. That's because Flink doesn't have the
> > functionalities to support all Hive's features. To list a few examples:
> >
> >1. Hive allows users to process data with shell scripts -- very
> similar
> >to UDFs [1]
> >2. Users can compile inline Groovy UDFs and use them in queries [2]
> >3. Users can dynamically add/delete jars, or even execute arbitrary
> >shell command [3]
> >
> > These features cannot be supported merely by a parser/planner, and it's
> > open to discussion whether Flink even should support them at all.
> >
> > So the ultimate goal of this FLIP is to provide Hive syntax compatibility
> > to features that are already available in Flink, which I believe will
> cover
> > most common use cases.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-TRANSFORMExamples
> > [2]
> >
> >
> https://community.cloudera.com/t5/Community-Articles/Apache-Hive-Groovy-UDF-examples/ta-p/245060
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveInteractiveShellCommands
> >
> > On Thu, Dec 10, 2020 at 6:11 PM Zhijiang  > .invalid>
> > wrote:
> >
> > > Thanks for launching the discussion and the FLIP, Rui!
> > >
> > > It is really nice to see our continuous efforts for compatibility with
> > > Hive and benefiting users in this area.
> > > I am only curious that are there any other compatible limitations for
> > Hive
> > > users after this FLIP? Or can I say that the Hive compatibility is
> > > completely resolved after this FLIP?
> > > I am interested in the ultimate goal in this area. Maybe it is out of
> > this
> > > FLIP scope, but still wish some insights from you if possible. :)
> > >
> > > Best,
> > > Zhijiang
> > >
> > >
> > > --
> > > From:Rui Li 
> > > Send Time:2020年12月10日(星期四) 16:46
> > > To:dev 
> > > Subject:Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility
> > >
> > > Thanks Kurt for your inputs!
> > >
> > > I agree we should extend Hive code to support non-Hive tables. I have
> > > updated the wiki page to remove the limitations you mentioned, and add
> > > typical use cases in the "Motivation" section.
> > >
> > > 

Re: [DISCUSS] FLINK-21110: Optimize Scheduler Performance for Large-Scale Jobs

2021-02-02 Thread Guowei Ma
Thanks to ZhiLong for improving the scheduler's performance.
I think many users would benefit from your work!
Best,
Guowei


On Wed, Feb 3, 2021 at 2:04 AM Till Rohrmann  wrote:

> Thanks for making the community aware of these performance improvements,
> Zhilong. I like them and I am looking forward to a faster Flink :-)
>
> Cheers,
> Till
>
> On Tue, Feb 2, 2021 at 11:00 AM Zhilong Hong 
> wrote:
>
> > Hello, everyone:
> >
> > I would like to start the discussion about FLINK-21110: Optimize
> Scheduler
> > Performance for Large-Scale Jobs [1].
> >
> > According to the result of scheduler benchmarks we implemented in
> > FLINK-20612 [2], the bottleneck of deploying and running a large-scale
> job
> > in Flink is mainly focused on the following procedures:
> >
> > Procedure   Time complexity
> > Initializing ExecutionGraph
> > O(N^2)
> > Building DefaultExecutionTopology
> > O(N^2)
> > Initializing PipelinedRegionSchedulingStrategy
> > O(N^2)
> > Scheduling downstream tasks when a task finishes
> > O(N^2)
> > Calculating tasks to restart when a failover occurs
> > O(N^2)
> > Releasing result partitions
> > O(N^2)
> >
> > These procedures are all related to the complexity of the topology in the
> > ExecutionGraph. Between two vertices connected with the all-to-all edges,
> > all the upstream Intermediate ResultPartitions are connected to all
> > downstream ExecutionVertices. The computation complexity of building and
> > traversing all these edges will be O(N^2).
> >
> > As for memory usage, currently we use ExecutionEdges to store the
> > information of connections. For the all-to-all distribution type, there
> are
> > O(N^2) ExecutionEdges. We test a simple job with only two vertices. The
> > parallelisms of them are both 10k. Furthermore, they are connected with
> > all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the
> > 100M ExecutionEdges.
> >
> > In most large-scale jobs, there will be more than two vertices with large
> > parallelisms, and they would cost a lot of time and memory to deploy the
> > job.
> >
> > As we can see, for two JobVertices connected with the all-to-all
> > distribution type, all IntermediateResultPartitions produced by the
> > upstream ExecutionVertices are isomorphic, which means that the
> downstream
> > ExecutionVertices they connected are exactly the same. The downstream
> > ExecutionVertices belonging to the same JobVertex are also isomorphic, as
> > the upstream ResultPartitions they connect are the same, too.
> >
> > Since every JobEdge has exactly one distribution type, we can divide the
> > vertices and result partitions into groups according to the distribution
> > type of the JobEdge.
> >
> > For the all-to-all distribution type, since all downstream vertices are
> > isomorphic, they belong to a single group, and all the upstream result
> > partitions are connected to this group. Vice versa, all the upstream
> result
> > partitions also belong to a single group, and all the downstream vertices
> > are connected to this group. In the past, when we wanted to iterate all
> the
> > downstream vertices, we needed to loop over them n times, which leads to
> > the complexity of O(N^2). Now since all upstream result partitions are
> > connected to one downstream group, we just need to loop over them once,
> > with the complexity of O(N).
> >
> > For the pointwise distribution type, because each result partition is
> > connected to different downstream vertices, they should belong to
> different
> > groups. Vice versa, all the vertices belong to different groups. Since
> one
> > result partition group is connected to one vertex group pointwisely, the
> > computation complexity of looping over them is still O(N).
> >
> > After we group the result partitions and vertices, ExecutionEdge is no
> > longer needed. For the test job we mentioned above, the optimization can
> > effectively reduce the memory usage from 4.175 GiB to 12.076 MiB
> (estimated
> > via MXBean) in our POC. The time cost is reduced from 62.090 seconds to
> > 8.551 seconds (with 10k parallelism).
> >
> > The detailed design doc with illustrations is located at [3]. Please find
> > more details in the links below.
> >
> > Looking forward to your feedback.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21110
> > [2] https://issues.apache.org/jira/browse/FLINK-20612
> > [3]
> >
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
> >
> >
>


[VOTE] FLIP-158: Generalized incremental checkpoints

2021-02-02 Thread Khachatryan Roman
Hi everyone,

I'd like to start a vote on FLIP-158 [1] which was discussed in [2].

The vote will be open for at least 72 hours. Unless there are any
objections,
I'll close it by February 5, 2021 if we have received sufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-158-Generalized-incremental-checkpoints-td47902.html



Regards,
Roman


Re: [DISCUSS] FLINK-21110: Optimize Scheduler Performance for Large-Scale Jobs

2021-02-02 Thread Till Rohrmann
Thanks for making the community aware of these performance improvements,
Zhilong. I like them and I am looking forward to a faster Flink :-)

Cheers,
Till

On Tue, Feb 2, 2021 at 11:00 AM Zhilong Hong  wrote:

> Hello, everyone:
>
> I would like to start the discussion about FLINK-21110: Optimize Scheduler
> Performance for Large-Scale Jobs [1].
>
> According to the result of scheduler benchmarks we implemented in
> FLINK-20612 [2], the bottleneck of deploying and running a large-scale job
> in Flink is mainly focused on the following procedures:
>
> Procedure   Time complexity
> Initializing ExecutionGraph
> O(N^2)
> Building DefaultExecutionTopology
> O(N^2)
> Initializing PipelinedRegionSchedulingStrategy
> O(N^2)
> Scheduling downstream tasks when a task finishes
> O(N^2)
> Calculating tasks to restart when a failover occurs
> O(N^2)
> Releasing result partitions
> O(N^2)
>
> These procedures are all related to the complexity of the topology in the
> ExecutionGraph. Between two vertices connected with the all-to-all edges,
> all the upstream Intermediate ResultPartitions are connected to all
> downstream ExecutionVertices. The computation complexity of building and
> traversing all these edges will be O(N^2).
>
> As for memory usage, currently we use ExecutionEdges to store the
> information of connections. For the all-to-all distribution type, there are
> O(N^2) ExecutionEdges. We test a simple job with only two vertices. The
> parallelisms of them are both 10k. Furthermore, they are connected with
> all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the
> 100M ExecutionEdges.
>
> In most large-scale jobs, there will be more than two vertices with large
> parallelisms, and they would cost a lot of time and memory to deploy the
> job.
>
> As we can see, for two JobVertices connected with the all-to-all
> distribution type, all IntermediateResultPartitions produced by the
> upstream ExecutionVertices are isomorphic, which means that the downstream
> ExecutionVertices they connected are exactly the same. The downstream
> ExecutionVertices belonging to the same JobVertex are also isomorphic, as
> the upstream ResultPartitions they connect are the same, too.
>
> Since every JobEdge has exactly one distribution type, we can divide the
> vertices and result partitions into groups according to the distribution
> type of the JobEdge.
>
> For the all-to-all distribution type, since all downstream vertices are
> isomorphic, they belong to a single group, and all the upstream result
> partitions are connected to this group. Vice versa, all the upstream result
> partitions also belong to a single group, and all the downstream vertices
> are connected to this group. In the past, when we wanted to iterate all the
> downstream vertices, we needed to loop over them n times, which leads to
> the complexity of O(N^2). Now since all upstream result partitions are
> connected to one downstream group, we just need to loop over them once,
> with the complexity of O(N).
>
> For the pointwise distribution type, because each result partition is
> connected to different downstream vertices, they should belong to different
> groups. Vice versa, all the vertices belong to different groups. Since one
> result partition group is connected to one vertex group pointwisely, the
> computation complexity of looping over them is still O(N).
>
> After we group the result partitions and vertices, ExecutionEdge is no
> longer needed. For the test job we mentioned above, the optimization can
> effectively reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated
> via MXBean) in our POC. The time cost is reduced from 62.090 seconds to
> 8.551 seconds (with 10k parallelism).
>
> The detailed design doc with illustrations is located at [3]. Please find
> more details in the links below.
>
> Looking forward to your feedback.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21110
> [2] https://issues.apache.org/jira/browse/FLINK-20612
> [3]
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
>
>


[jira] [Created] (FLINK-21243) Add Java SDK maven module

2021-02-02 Thread Igal Shilman (Jira)
Igal Shilman created FLINK-21243:


 Summary: Add Java SDK maven module
 Key: FLINK-21243
 URL: https://issues.apache.org/jira/browse/FLINK-21243
 Project: Flink
  Issue Type: Task
  Components: Stateful Functions
Reporter: Igal Shilman


Add the Skelton maven module that will contain the Java SDK.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Jark Wu
Hi Fabian,

I think we have an agreement that the functions should be evaluated at
query start in batch mode.
Because all the other batch systems and traditional databases are this
behavior, which is standard SQL compliant.

*1. The different point of view is what's the behavior in streaming mode? *

>From my point of view, I don't see any potential meaning to evaluate at
query-start for a 365-day long running streaming job.
And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
streaming users and they expect the current behaviors.
The SQL standard only provides a guideline for traditional batch systems,
however Flink is a leading streaming processing system
which is out of the scope of SQL standard, and Flink should define the
streaming standard. I think a standard should follow users' intuition.
Therefore, I think we don't need to be standard SQL compliant at this point
because users don't expect it.
Changing the behavior of the functions to evaluate at query start for
streaming mode will hurt most of Flink SQL users and we have nothing to
gain,
we should avoid this.

*2. Does it break the unified streaming-batch semantics? *

I don't think so. First of all, what's the unified streaming-batch
semantic?
I think it means the* eventual result* instead of the *behavior*.
It's hard to say we have provided unified behavior for streaming and batch
jobs,
because for example unbounded aggregate behaves very differently.
In batch mode, it only evaluates once for the bounded data and emits the
aggregate result once.
 But in streaming mode, it evaluates for each row and emits the updated
result.
What we have always emphasized "unified streaming-batch semantics" is [1]

> a query produces exactly the same result regardless whether its input is
static batch data or streaming data.

>From my understanding, the "semantic" means the "eventual result".
And time functions are non-deterministic, so it's reasonable to get
different results for batch and streaming mode.
Therefore, I think it doesn't break the unified streaming-batch semantics
to evaluate per-record for streaming and
query-start for batch, as the semantic doesn't means behavior semantic.

Best,
Jark

[1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html

On Tue, 2 Feb 2021 at 18:34, Fabian Hueske  wrote:

> Hi everyone,
>
> Sorry for joining this discussion late.
> Let me give some thought to two of the arguments raised in this thread.
>
> Time functions are inherently non-determintistic:
> --
> This is of course true, but IMO it doesn't mean that the semantics of time
> functions do not matter.
> It makes a difference whether a function is evaluated once and it's result
> is reused or whether it is invoked for every record.
> Would you use the same logic to justify different behavior of RAND() in
> batch and streaming queries?
>
> Provide the semantics that most users expect:
> --
> I don't think it is clear what most users expect, esp. if we also include
> future users (which we certainly want to gain) into this assessment.
> Our current users got used to the semantics that we introduced. So I
> wouldn't be surprised if they would say stick with the current semantics.
> However, we are also claiming standard SQL compliance and stress the goal
> of batch-stream unification.
> So I would assume that new SQL users expect standard compliant behavior for
> batch and streaming queries.
>
>
> IMO, we should try hard to stick to our goals of 1) unified batch-streaming
> semantics and 2) SQL standard compliance.
> For me this means that the semantics of the functions should be adjusted to
> be evaluated at query start by default for batch and streaming queries.
> Obviously this would affect *many* current users of streaming SQL.
> For those we should provide two solutions:
>
> 1) Add alternative methods that provide the current behavior of the time
> functions.
> I like Timo's proposal to add a prefix like SYS_ (or PROC_) but don't care
> too much about the names.
> The important point is that users need alternative functions to provide the
> desired semantics.
>
> 2) Add a configuration option to reestablish the current behavior of the
> time functions.
> IMO, the configuration option should not be considered as a permanent
> option but rather as a migration path towards the "right" (standard
> compliant) behavior.
>
> Best, Fabian
>
> Am Di., 2. Feb. 2021 um 09:51 Uhr schrieb Kurt Young :
>
> > BTW I also don't like to introduce an option for this case at the
> > first step.
> >
> > If we can find a default behavior which can make 90% users happy, we
> should
> > do it. If the remaining
> > 10% percent users start to complain about the fixed behavior (it's also
> > possible that they don't complain ever),
> >  we could offer an option to make them happy. If it turns out that we had
> > wrong estimation about the user's
> > expectation, we should change the default behavior.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Feb 2, 2021 at 4:46 PM 

[jira] [Created] (FLINK-21242) Support state access API for map operation

2021-02-02 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-21242:
-

 Summary: Support state access API for map operation
 Key: FLINK-21242
 URL: https://issues.apache.org/jira/browse/FLINK-21242
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Wei Zhong
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21241) E2E test exception check might fail if older checkpoint not completed when triggering the stop-with-savepoint

2021-02-02 Thread Yun Tang (Jira)
Yun Tang created FLINK-21241:


 Summary: E2E test exception check might fail if older checkpoint 
not completed when triggering the stop-with-savepoint
 Key: FLINK-21241
 URL: https://issues.apache.org/jira/browse/FLINK-21241
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Tests
Affects Versions: 1.12.1, 1.11.3, 1.13.0
Reporter: Yun Tang
 Fix For: 1.11.4, 1.12.2, 1.13.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12724=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529]
 E2E test fails due to find exceptions in logs.

This is because we trigger the {{stop-with-savepoint-13}} when checkpoint-12 
was not completed. 

{code:java}
2021-02-01 07:54:22,542 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 12 (type=CHECKPOINT) @ 1612166062541 for job 
603ede5f1dcc9858f8db6a787992c181.
2021-02-01 07:54:22,543 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Triggering stop-with-savepoint for job 
603ede5f1dcc9858f8db6a787992c181.
2021-02-01 07:54:22,599 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 13 (type=SYNC_SAVEPOINT) @ 1612166062597 for job 
603ede5f1dcc9858f8db6a787992c181.
{code}

As we will stop the checkpoint scheduler when triggering the 
{{stop-with-savepoint}}, the old checkpoint-12 would be aborted. And we could 
find exceptions in task managers:

{code:java}
2021-02-01 07:54:22,592 WARN  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Could not 
properly clean up the async checkpoint runnable.
java.lang.IllegalStateException: null
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1176)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportAbortedSnapshotStats(AsyncCheckpointRunnable.java:223)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.close(AsyncCheckpointRunnable.java:306)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:275) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancelAsyncCheckpointRunnable(SubtaskCheckpointCoordinatorImpl.java:451)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:340)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$12(StreamTask.java:1070)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1083)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 [flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:314)
 [flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:300)
 [flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:188)
 [flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:615)
 [flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:579) 
[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 

[jira] [Created] (FLINK-21240) incompati datetime

2021-02-02 Thread macdoor615 (Jira)
macdoor615 created FLINK-21240:
--

 Summary: incompati datetime 
 Key: FLINK-21240
 URL: https://issues.apache.org/jira/browse/FLINK-21240
 Project: Flink
  Issue Type: Bug
Reporter: macdoor615






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-02 Thread Yang Wang
+1 (non-binding)

Best,
Yang

Chesnay Schepler  于2021年2月2日周二 下午5:50写道:

> +1
>
> On 2/2/2021 10:11 AM, Till Rohrmann wrote:
> > +1 (binding)
> >
> > Cheers,
> > Till
> >
> > On Mon, Feb 1, 2021 at 5:38 AM Xintong Song 
> wrote:
> >
> >> +1 (binding)
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Mon, Feb 1, 2021 at 11:56 AM Yangze Guo  wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> I'd like to start the vote of FLIP-156 [1]. This FLIP is discussed in
> >>> the thread[2].
> >>>
> >>> The vote will be open for at least 72 hours. Unless there is an
> >> objection,
> >>> I will try to close it by February 4, 2021 if we have received
> >>> sufficient votes.
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
> >>> [2]
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-156-Runtime-Interfaces-for-Fine-Grained-Resource-Requirements-td47650.html
> >>> Best,
> >>> Yangze Guo
> >>>
>
>


Re: Proposal to add Google Cloud Storage FileSystem with RecoverableWriter

2021-02-02 Thread Xintong Song
Hi Galen,

Thanks for offering the contribution.

As Till has already suggested, please comment on FLINK-11838 your solution
proposal.
Once we reach consensus on the proposal, I'll assign you to the ticket.

Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 5:19 PM Till Rohrmann  wrote:

> Hi Galen,
>
> I think that adding support for GCS using the StreamingFileSink sounds like
> a very good idea to me. Looking at FLINK-11838 I believe that this effort
> has been abandoned. I think that you could take this ticket over if you
> want. Maybe you could update this ticket with your solution proposal.
>
> I will check whether I can find a committer who could help you with this
> effort.
>
> Cheers,
> Till
>
> On Sat, Jan 30, 2021 at 7:43 PM Galen Warren 
> wrote:
>
> > Hi -- I'm wondering if you would be interested in a contribution to add a
> > HadoopFileSystem implementation, with associated RecoverableWriter, for
> > Google Cloud Storage. This would be similar to what's already in place
> for
> > S3, and it would allow writing to GCS using a StreamingFileSink. The
> > implementation would be similar to what's already in place for S3.
> >
> > I see there's been some work on this before (FLINK-11838 Add GCS
> > RecoverableWriter by Fokko · Pull Request #7915 · apache/flink (
> github.com
> > )
> > , but the original people
> > working on it have put it on hold, and the last activity was over six
> > months ago.
> >
> > I need this for my own purposes and I have an implementation that I'm
> > working on locally. I'd be interested to contribute this if you'd be
> > interested. Let me know if so and I'll create a Jira ticket.
> >
> > Thanks,
> > Galen Warren
> >
>


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-02 Thread Timo Walther
Thanks for this great proposal Shengkai. This will give the SQL Client a 
very good update and make it production ready.


Here is some feedback from my side:

1) SQL client specific options

I don't think that `sql-client.planner` and `sql-client.execution.mode` 
are SQL Client specific. Similar to `StreamExecutionEnvironment` and 
`ExecutionConfig#configure` that have been added recently, we should 
offer a possibility for TableEnvironment. How about we offer 
`TableEnvironment.create(ReadableConfig)` and add a `table.planner` and 
`table.execution-mode` to 
`org.apache.flink.table.api.config.TableConfigOptions`?


2) Execution file

Did you have a look at the Appendix of FLIP-84 [1] including the mailing 
list thread at that time? Could you further elaborate how the 
multi-statement execution should work for a unified batch/streaming 
story? According to our past discussions, each line in an execution file 
should be executed blocking which means a streaming query needs a 
statement set to execute multiple INSERT INTO statement, correct? We 
should also offer this functionality in 
`TableEnvironment.executeMultiSql()`. Whether `sql-client.job.detach` is 
SQL Client specific needs to be determined, it could also be a general 
`table.multi-sql-async` option?


3) DELETE JAR

Shouldn't the opposite of "ADD" be "REMOVE"? "DELETE" sounds like one is 
actively deleting the JAR in the corresponding path.


4) LIST JAR

This should be `SHOW JARS` according to other SQL commands such as `SHOW 
CATALOGS`, `SHOW TABLES`, etc. [2].


5) EXPLAIN [ExplainDetail[, ExplainDetail]*]

We should keep the details in sync with 
`org.apache.flink.table.api.ExplainDetail` and avoid confusion about 
differently named ExplainDetails. I would vote for `ESTIMATED_COST` 
instead of `COST`. I'm sure the original author had a reason why to call 
it that way.


6) Implementation details

It would be nice to understand how we plan to implement the given 
features. Most of the commands and config options should go into 
TableEnvironment and SqlParser directly, correct? This way users have a 
unified way of using Flink SQL. TableEnvironment would provide a similar 
user experience in notebooks or interactive programs than the SQL Client.


[1] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/show.html


Regards,
Timo


On 02.02.21 10:13, Shengkai Fang wrote:

Sorry for the typo. I mean `RESET` is much better rather than `UNSET`.

Shengkai Fang  于2021年2月2日周二 下午4:44写道:


Hi, Jingsong.

Thanks for your reply. I think `UNSET` is much better.

1. We don't need to introduce another command `UNSET`. `RESET` is
supported in the current sql client now. Our proposal just extends its
grammar and allow users to reset the specified keys.
2. Hive beeline also uses `RESET` to set the key to the default value[1].
I think it is more friendly for batch users.

Best,
Shengkai

[1] https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients

Jingsong Li  于2021年2月2日周二 下午1:56写道:


Thanks for the proposal, yes, sql-client is too outdated. +1 for
improving it.

About "SET"  and "RESET", Why not be "SET" and "UNSET"?

Best,
Jingsong

On Mon, Feb 1, 2021 at 2:46 PM Rui Li  wrote:


Thanks Shengkai for the update! The proposed changes look good to me.

On Fri, Jan 29, 2021 at 8:26 PM Shengkai Fang  wrote:


Hi, Rui.
You are right. I have already modified the FLIP.

The main changes:

# -f parameter has no restriction about the statement type.
Sometimes, users use the pipe to redirect the result of queries to

debug

when submitting job by -f parameter. It's much convenient comparing to
writing INSERT INTO statements.

# Add a new sql client option `sql-client.job.detach` .
Users prefer to execute jobs one by one in the batch mode. Users can

set

this option false and the client will process the next job until the
current job finishes. The default value of this option is false, which
means the client will execute the next job when the current job is
submitted.

Best,
Shengkai



Rui Li  于2021年1月29日周五 下午4:52写道:


Hi Shengkai,

Regarding #2, maybe the -f options in flink and hive have different
implications, and we should clarify the behavior. For example, if the
client just submits the job and exits, what happens if the file

contains

two INSERT statements? I don't think we should treat them as a

statement

set, because users should explicitly write BEGIN STATEMENT SET in that
case. And the client shouldn't asynchronously submit the two jobs,

because

the 2nd may depend on the 1st, right?

On Fri, Jan 29, 2021 at 4:30 PM Shengkai Fang 

wrote:



Hi Rui,
Thanks for your feedback. I agree with your suggestions.

For the suggestion 1: Yes. we are plan to strengthen the set

command. In

the implementation, it will just put the key-value into the
`Configuration`, which will be used to generate the table config. If

hive

supports to read the setting from 

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Fabian Hueske
Hi everyone,

Sorry for joining this discussion late.
Let me give some thought to two of the arguments raised in this thread.

Time functions are inherently non-determintistic:
--
This is of course true, but IMO it doesn't mean that the semantics of time
functions do not matter.
It makes a difference whether a function is evaluated once and it's result
is reused or whether it is invoked for every record.
Would you use the same logic to justify different behavior of RAND() in
batch and streaming queries?

Provide the semantics that most users expect:
--
I don't think it is clear what most users expect, esp. if we also include
future users (which we certainly want to gain) into this assessment.
Our current users got used to the semantics that we introduced. So I
wouldn't be surprised if they would say stick with the current semantics.
However, we are also claiming standard SQL compliance and stress the goal
of batch-stream unification.
So I would assume that new SQL users expect standard compliant behavior for
batch and streaming queries.


IMO, we should try hard to stick to our goals of 1) unified batch-streaming
semantics and 2) SQL standard compliance.
For me this means that the semantics of the functions should be adjusted to
be evaluated at query start by default for batch and streaming queries.
Obviously this would affect *many* current users of streaming SQL.
For those we should provide two solutions:

1) Add alternative methods that provide the current behavior of the time
functions.
I like Timo's proposal to add a prefix like SYS_ (or PROC_) but don't care
too much about the names.
The important point is that users need alternative functions to provide the
desired semantics.

2) Add a configuration option to reestablish the current behavior of the
time functions.
IMO, the configuration option should not be considered as a permanent
option but rather as a migration path towards the "right" (standard
compliant) behavior.

Best, Fabian

Am Di., 2. Feb. 2021 um 09:51 Uhr schrieb Kurt Young :

> BTW I also don't like to introduce an option for this case at the
> first step.
>
> If we can find a default behavior which can make 90% users happy, we should
> do it. If the remaining
> 10% percent users start to complain about the fixed behavior (it's also
> possible that they don't complain ever),
>  we could offer an option to make them happy. If it turns out that we had
> wrong estimation about the user's
> expectation, we should change the default behavior.
>
> Best,
> Kurt
>
>
> On Tue, Feb 2, 2021 at 4:46 PM Kurt Young  wrote:
>
> > Hi Timo,
> >
> > I don't think batch-stream unification can deal with all the cases,
> > especially if
> > the query involves some non deterministic functions.
> >
> > No matter we choose any options, these queries will have
> > different results.
> > For example, if we run the same query in batch mode multiple times, it's
> > also
> > highly possible that we get different results. Does that mean all the
> > database
> > vendors can't deliver batch-batch unification? I don't think so.
> >
> > What's really important here is the user's intuition. What do users
> expect
> > if
> > they don't read any documents about these functions. For batch users, I
> > think
> > it's already clear enough that all other systems and databases will
> > evaluate
> > these functions during query start. And for streaming users, I have
> > already seen
> > some users are expecting these functions to be calculated per record.
> >
> > Thus I think we can make the behavior determined together with execution
> > mode.
> > One exception would be PROCTIME(), I think all users would expect this
> > function
> > will be calculated for each record. I think SYS_CURRENT_TIMESTAMP is
> > similar
> > to PROCTIME(), so we don't have to introduce it.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Feb 2, 2021 at 4:20 PM Timo Walther  wrote:
> >
> >> Hi everyone,
> >>
> >> I'm not sure if we should introduce the `auto` mode. Taking all the
> >> previous discussions around batch-stream unification into account, batch
> >> mode and streaming mode should only influence the runtime efficiency and
> >> incremental computation. The final query result should be the same in
> >> both modes. Also looking into the long-term future, we might drop the
> >> mode property and either derive the mode or use different modes for
> >> parts of the pipeline.
> >>
> >> "I think we may need to think more from the users' perspective."
> >>
> >> I agree here and that's why I actually would like to let the user decide
> >> which semantics are needed. The config option proposal was my least
> >> favored alternative. We should stick to the standard and bahavior of
> >> other systems. For both batch and streaming. And use a simple prefix to
> >> let users decide whether the semantics are per-record or per-query:
> >>
> >> CURRENT_TIMESTAMP   -- semantics as all other vendors
> >>
> >>
> >> _CURRENT_TIMESTAMP  -- semantics per record
> 

[jira] [Created] (FLINK-21239) Upgrade Calcite version to 1.28

2021-02-02 Thread Timo Walther (Jira)
Timo Walther created FLINK-21239:


 Summary: Upgrade Calcite version to 1.28
 Key: FLINK-21239
 URL: https://issues.apache.org/jira/browse/FLINK-21239
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Reporter: Timo Walther


The following files should be removed from the Flink code base during an 
upgrade:
- org.apache.calcite.rex.RexLiteral



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] FLINK-21110: Optimize Scheduler Performance for Large-Scale Jobs

2021-02-02 Thread Zhilong Hong
Hello, everyone:

I would like to start the discussion about FLINK-21110: Optimize Scheduler 
Performance for Large-Scale Jobs [1].

According to the result of scheduler benchmarks we implemented in FLINK-20612 
[2], the bottleneck of deploying and running a large-scale job in Flink is 
mainly focused on the following procedures:

Procedure   Time complexity
Initializing ExecutionGraph
O(N^2)
Building DefaultExecutionTopology
O(N^2)
Initializing PipelinedRegionSchedulingStrategy
O(N^2)
Scheduling downstream tasks when a task finishes
O(N^2)
Calculating tasks to restart when a failover occurs
O(N^2)
Releasing result partitions
O(N^2)

These procedures are all related to the complexity of the topology in the 
ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
the upstream Intermediate ResultPartitions are connected to all downstream 
ExecutionVertices. The computation complexity of building and traversing all 
these edges will be O(N^2).

As for memory usage, currently we use ExecutionEdges to store the information 
of connections. For the all-to-all distribution type, there are O(N^2) 
ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
of them are both 10k. Furthermore, they are connected with all-to-all edges. It 
takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.

In most large-scale jobs, there will be more than two vertices with large 
parallelisms, and they would cost a lot of time and memory to deploy the job.

As we can see, for two JobVertices connected with the all-to-all distribution 
type, all IntermediateResultPartitions produced by the upstream 
ExecutionVertices are isomorphic, which means that the downstream 
ExecutionVertices they connected are exactly the same. The downstream 
ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
upstream ResultPartitions they connect are the same, too.

Since every JobEdge has exactly one distribution type, we can divide the 
vertices and result partitions into groups according to the distribution type 
of the JobEdge.

For the all-to-all distribution type, since all downstream vertices are 
isomorphic, they belong to a single group, and all the upstream result 
partitions are connected to this group. Vice versa, all the upstream result 
partitions also belong to a single group, and all the downstream vertices are 
connected to this group. In the past, when we wanted to iterate all the 
downstream vertices, we needed to loop over them n times, which leads to the 
complexity of O(N^2). Now since all upstream result partitions are connected to 
one downstream group, we just need to loop over them once, with the complexity 
of O(N).

For the pointwise distribution type, because each result partition is connected 
to different downstream vertices, they should belong to different groups. Vice 
versa, all the vertices belong to different groups. Since one result partition 
group is connected to one vertex group pointwisely, the computation complexity 
of looping over them is still O(N).

After we group the result partitions and vertices, ExecutionEdge is no longer 
needed. For the test job we mentioned above, the optimization can effectively 
reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in 
our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 
10k parallelism).

The detailed design doc with illustrations is located at [3]. Please find more 
details in the links below.

Looking forward to your feedback.

[1] https://issues.apache.org/jira/browse/FLINK-21110
[2] https://issues.apache.org/jira/browse/FLINK-20612
[3] 
https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing



Re: [VOTE] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-02 Thread Chesnay Schepler

+1

On 2/2/2021 10:11 AM, Till Rohrmann wrote:

+1 (binding)

Cheers,
Till

On Mon, Feb 1, 2021 at 5:38 AM Xintong Song  wrote:


+1 (binding)

Thank you~

Xintong Song



On Mon, Feb 1, 2021 at 11:56 AM Yangze Guo  wrote:


Hi everyone,

I'd like to start the vote of FLIP-156 [1]. This FLIP is discussed in
the thread[2].

The vote will be open for at least 72 hours. Unless there is an

objection,

I will try to close it by February 4, 2021 if we have received
sufficient votes.

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements

[2]


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-156-Runtime-Interfaces-for-Fine-Grained-Resource-Requirements-td47650.html

Best,
Yangze Guo





Re: Proposal to add Google Cloud Storage FileSystem with RecoverableWriter

2021-02-02 Thread Till Rohrmann
Hi Galen,

I think that adding support for GCS using the StreamingFileSink sounds like
a very good idea to me. Looking at FLINK-11838 I believe that this effort
has been abandoned. I think that you could take this ticket over if you
want. Maybe you could update this ticket with your solution proposal.

I will check whether I can find a committer who could help you with this
effort.

Cheers,
Till

On Sat, Jan 30, 2021 at 7:43 PM Galen Warren 
wrote:

> Hi -- I'm wondering if you would be interested in a contribution to add a
> HadoopFileSystem implementation, with associated RecoverableWriter, for
> Google Cloud Storage. This would be similar to what's already in place for
> S3, and it would allow writing to GCS using a StreamingFileSink. The
> implementation would be similar to what's already in place for S3.
>
> I see there's been some work on this before (FLINK-11838 Add GCS
> RecoverableWriter by Fokko · Pull Request #7915 · apache/flink (github.com
> )
> , but the original people
> working on it have put it on hold, and the last activity was over six
> months ago.
>
> I need this for my own purposes and I have an implementation that I'm
> working on locally. I'd be interested to contribute this if you'd be
> interested. Let me know if so and I'll create a Jira ticket.
>
> Thanks,
> Galen Warren
>


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-02 Thread Shengkai Fang
Sorry for the typo. I mean `RESET` is much better rather than `UNSET`.

Shengkai Fang  于2021年2月2日周二 下午4:44写道:

> Hi, Jingsong.
>
> Thanks for your reply. I think `UNSET` is much better.
>
> 1. We don't need to introduce another command `UNSET`. `RESET` is
> supported in the current sql client now. Our proposal just extends its
> grammar and allow users to reset the specified keys.
> 2. Hive beeline also uses `RESET` to set the key to the default value[1].
> I think it is more friendly for batch users.
>
> Best,
> Shengkai
>
> [1] https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients
>
> Jingsong Li  于2021年2月2日周二 下午1:56写道:
>
>> Thanks for the proposal, yes, sql-client is too outdated. +1 for
>> improving it.
>>
>> About "SET"  and "RESET", Why not be "SET" and "UNSET"?
>>
>> Best,
>> Jingsong
>>
>> On Mon, Feb 1, 2021 at 2:46 PM Rui Li  wrote:
>>
>>> Thanks Shengkai for the update! The proposed changes look good to me.
>>>
>>> On Fri, Jan 29, 2021 at 8:26 PM Shengkai Fang  wrote:
>>>
>>> > Hi, Rui.
>>> > You are right. I have already modified the FLIP.
>>> >
>>> > The main changes:
>>> >
>>> > # -f parameter has no restriction about the statement type.
>>> > Sometimes, users use the pipe to redirect the result of queries to
>>> debug
>>> > when submitting job by -f parameter. It's much convenient comparing to
>>> > writing INSERT INTO statements.
>>> >
>>> > # Add a new sql client option `sql-client.job.detach` .
>>> > Users prefer to execute jobs one by one in the batch mode. Users can
>>> set
>>> > this option false and the client will process the next job until the
>>> > current job finishes. The default value of this option is false, which
>>> > means the client will execute the next job when the current job is
>>> > submitted.
>>> >
>>> > Best,
>>> > Shengkai
>>> >
>>> >
>>> >
>>> > Rui Li  于2021年1月29日周五 下午4:52写道:
>>> >
>>> >> Hi Shengkai,
>>> >>
>>> >> Regarding #2, maybe the -f options in flink and hive have different
>>> >> implications, and we should clarify the behavior. For example, if the
>>> >> client just submits the job and exits, what happens if the file
>>> contains
>>> >> two INSERT statements? I don't think we should treat them as a
>>> statement
>>> >> set, because users should explicitly write BEGIN STATEMENT SET in that
>>> >> case. And the client shouldn't asynchronously submit the two jobs,
>>> because
>>> >> the 2nd may depend on the 1st, right?
>>> >>
>>> >> On Fri, Jan 29, 2021 at 4:30 PM Shengkai Fang 
>>> wrote:
>>> >>
>>> >>> Hi Rui,
>>> >>> Thanks for your feedback. I agree with your suggestions.
>>> >>>
>>> >>> For the suggestion 1: Yes. we are plan to strengthen the set
>>> command. In
>>> >>> the implementation, it will just put the key-value into the
>>> >>> `Configuration`, which will be used to generate the table config. If
>>> hive
>>> >>> supports to read the setting from the table config, users are able
>>> to set
>>> >>> the hive-related settings.
>>> >>>
>>> >>> For the suggestion 2: The -f parameter will submit the job and exit.
>>> If
>>> >>> the queries never end, users have to cancel the job by themselves,
>>> which is
>>> >>> not reliable(people may forget their jobs). In most case, queries
>>> are used
>>> >>> to analyze the data. Users should use queries in the interactive
>>> mode.
>>> >>>
>>> >>> Best,
>>> >>> Shengkai
>>> >>>
>>> >>> Rui Li  于2021年1月29日周五 下午3:18写道:
>>> >>>
>>>  Thanks Shengkai for bringing up this discussion. I think it covers a
>>>  lot of useful features which will dramatically improve the
>>> usability of our
>>>  SQL Client. I have two questions regarding the FLIP.
>>> 
>>>  1. Do you think we can let users set arbitrary configurations via
>>> the
>>>  SET command? A connector may have its own configurations and we
>>> don't have
>>>  a way to dynamically change such configurations in SQL Client. For
>>> example,
>>>  users may want to be able to change hive conf when using hive
>>> connector [1].
>>>  2. Any reason why we have to forbid queries in SQL files specified
>>> with
>>>  the -f option? Hive supports a similar -f option but allows queries
>>> in the
>>>  file. And a common use case is to run some query and redirect the
>>> results
>>>  to a file. So I think maybe flink users would like to do the same,
>>>  especially in batch scenarios.
>>> 
>>>  [1] https://issues.apache.org/jira/browse/FLINK-20590
>>> 
>>>  On Fri, Jan 29, 2021 at 10:46 AM Sebastian Liu <
>>> liuyang0...@gmail.com>
>>>  wrote:
>>> 
>>> > Hi Shengkai,
>>> >
>>> > Glad to see this improvement. And I have some additional
>>> suggestions:
>>> >
>>> > #1. Unify the TableEnvironment in ExecutionContext to
>>> > StreamTableEnvironment for both streaming and batch sql.
>>> > #2. Improve the way of results retrieval: sql client collect the
>>> > results
>>> > locally all at once using accumulators at present,
>>> >   which may have 

Re: [VOTE] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-02 Thread Till Rohrmann
+1 (binding)

Cheers,
Till

On Mon, Feb 1, 2021 at 5:38 AM Xintong Song  wrote:

> +1 (binding)
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Feb 1, 2021 at 11:56 AM Yangze Guo  wrote:
>
> > Hi everyone,
> >
> > I'd like to start the vote of FLIP-156 [1]. This FLIP is discussed in
> > the thread[2].
> >
> > The vote will be open for at least 72 hours. Unless there is an
> objection,
> > I will try to close it by February 4, 2021 if we have received
> > sufficient votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-156-Runtime-Interfaces-for-Fine-Grained-Resource-Requirements-td47650.html
> >
> > Best,
> > Yangze Guo
> >
>


Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2021-02-02 Thread Jingsong Li
Thanks Rui for the proposal, I think this FLIP is required by many users,
and it is very good to traditional Hive users. I have some confusion:

# Version

Which Hive version do you want to choose? Maybe, Hive 3.X and Hive 2.X have
some differences?

# Hive Codes

Can you evaluate how much code we need to copy to our flink-hive-connector?
Do we need to change them? We need to maintain them anyway.

# Functions

About Hive functions, I don't think it is a limitation, we are using
HiveModule to be compatible with Hive, right? So it is a solution instead
of a limitation.

# Keywords

Do you think there will be a keyword problem? Or can we be 100% compatible
with Hive?

On the whole, the FLIP looks very good and I'm looking forward to it.

Best,
Jingsong

On Fri, Dec 11, 2020 at 11:35 AM Zhijiang
 wrote:

> Thanks for the further info and explanations! I have no other concerns.
>
> Best,
> Zhijiang
>
>
> --
> From:Rui Li 
> Send Time:2020年12月10日(星期四) 20:35
> To:dev ; Zhijiang 
> Subject:Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility
>
> Hi Zhijiang,
>
> Glad to know you're interested in this FLIP. I wouldn't claim 100%
> compatibility with this FLIP. That's because Flink doesn't have the
> functionalities to support all Hive's features. To list a few examples:
>
>1. Hive allows users to process data with shell scripts -- very similar
>to UDFs [1]
>2. Users can compile inline Groovy UDFs and use them in queries [2]
>3. Users can dynamically add/delete jars, or even execute arbitrary
>shell command [3]
>
> These features cannot be supported merely by a parser/planner, and it's
> open to discussion whether Flink even should support them at all.
>
> So the ultimate goal of this FLIP is to provide Hive syntax compatibility
> to features that are already available in Flink, which I believe will cover
> most common use cases.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-TRANSFORMExamples
> [2]
>
> https://community.cloudera.com/t5/Community-Articles/Apache-Hive-Groovy-UDF-examples/ta-p/245060
> [3]
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveInteractiveShellCommands
>
> On Thu, Dec 10, 2020 at 6:11 PM Zhijiang  .invalid>
> wrote:
>
> > Thanks for launching the discussion and the FLIP, Rui!
> >
> > It is really nice to see our continuous efforts for compatibility with
> > Hive and benefiting users in this area.
> > I am only curious that are there any other compatible limitations for
> Hive
> > users after this FLIP? Or can I say that the Hive compatibility is
> > completely resolved after this FLIP?
> > I am interested in the ultimate goal in this area. Maybe it is out of
> this
> > FLIP scope, but still wish some insights from you if possible. :)
> >
> > Best,
> > Zhijiang
> >
> >
> > --
> > From:Rui Li 
> > Send Time:2020年12月10日(星期四) 16:46
> > To:dev 
> > Subject:Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility
> >
> > Thanks Kurt for your inputs!
> >
> > I agree we should extend Hive code to support non-Hive tables. I have
> > updated the wiki page to remove the limitations you mentioned, and add
> > typical use cases in the "Motivation" section.
> >
> > Regarding comment #b, the interface is defined in
> flink-table-planner-blink
> > and only used by the blink planner. So I think "BlinkParserFactory" is a
> > better name, WDYT?
> >
> > On Mon, Dec 7, 2020 at 12:28 PM Kurt Young  wrote:
> >
> > > Thanks Rui for starting this discussion.
> > >
> > > I can see the benefit that we improve hive compatibility further, as
> > quite
> > > some users are asking for this
> > > feature in mailing lists [1][2][3] and some online chatting tools such
> as
> > > DingTalk.
> > >
> > > I have 3 comments regarding to the design doc:
> > >
> > > a) Could you add a section to describe the typical use case you want to
> > > support after this feature is introduced?
> > > In that way, users can also have an impression how to use this feature
> > and
> > > what the behavior and outcome will be.
> > >
> > > b) Regarding the naming: "BlinkParserFactory", I suggest renaming it to
> > > "FlinkParserFactory".
> > >
> > > c) About the two limitations you mentioned:
> > > 1. Only works with Hive tables and the current catalog needs to be
> a
> > > HiveCatalog.
> > > 2. Queries cannot involve tables/views from multiple catalogs.
> > > I assume this is because hive parser and analyzer doesn't support
> > > referring to a name with "x.y.z" fashion? Since
> > > we can control all the behaviors by leveraging the codes hive currently
> > > use. Is it possible that we can remove such
> > > limitations? The reason is I'm not sure if users can make the whole
> story
> > > work purely depending on hive catalog (that's
> > > the reason why I gave comment #a). If multiple 

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Kurt Young
BTW I also don't like to introduce an option for this case at the
first step.

If we can find a default behavior which can make 90% users happy, we should
do it. If the remaining
10% percent users start to complain about the fixed behavior (it's also
possible that they don't complain ever),
 we could offer an option to make them happy. If it turns out that we had
wrong estimation about the user's
expectation, we should change the default behavior.

Best,
Kurt


On Tue, Feb 2, 2021 at 4:46 PM Kurt Young  wrote:

> Hi Timo,
>
> I don't think batch-stream unification can deal with all the cases,
> especially if
> the query involves some non deterministic functions.
>
> No matter we choose any options, these queries will have
> different results.
> For example, if we run the same query in batch mode multiple times, it's
> also
> highly possible that we get different results. Does that mean all the
> database
> vendors can't deliver batch-batch unification? I don't think so.
>
> What's really important here is the user's intuition. What do users expect
> if
> they don't read any documents about these functions. For batch users, I
> think
> it's already clear enough that all other systems and databases will
> evaluate
> these functions during query start. And for streaming users, I have
> already seen
> some users are expecting these functions to be calculated per record.
>
> Thus I think we can make the behavior determined together with execution
> mode.
> One exception would be PROCTIME(), I think all users would expect this
> function
> will be calculated for each record. I think SYS_CURRENT_TIMESTAMP is
> similar
> to PROCTIME(), so we don't have to introduce it.
>
> Best,
> Kurt
>
>
> On Tue, Feb 2, 2021 at 4:20 PM Timo Walther  wrote:
>
>> Hi everyone,
>>
>> I'm not sure if we should introduce the `auto` mode. Taking all the
>> previous discussions around batch-stream unification into account, batch
>> mode and streaming mode should only influence the runtime efficiency and
>> incremental computation. The final query result should be the same in
>> both modes. Also looking into the long-term future, we might drop the
>> mode property and either derive the mode or use different modes for
>> parts of the pipeline.
>>
>> "I think we may need to think more from the users' perspective."
>>
>> I agree here and that's why I actually would like to let the user decide
>> which semantics are needed. The config option proposal was my least
>> favored alternative. We should stick to the standard and bahavior of
>> other systems. For both batch and streaming. And use a simple prefix to
>> let users decide whether the semantics are per-record or per-query:
>>
>> CURRENT_TIMESTAMP   -- semantics as all other vendors
>>
>>
>> _CURRENT_TIMESTAMP  -- semantics per record
>>
>> OR
>>
>> SYS_CURRENT_TIMESTAMP  -- semantics per record
>>
>>
>> Please check how other vendors are handling this:
>>
>> SYSDATE  MySql, Oracle
>> SYSDATETIME  SQL Server
>>
>>
>> Regards,
>> Timo
>>
>>
>> On 02.02.21 07:02, Jingsong Li wrote:
>> > +1 for the default "auto" to the "table.exec.time-function-evaluation".
>> >
>> >>From the definition of these functions, in my opinion:
>> > - Batch is the instant execution of all records, which is the meaning of
>> > the word "BATCH", so there is only one time at query-start.
>> > - Stream only executes a single record in a moment, so time is
>> generated by
>> > each record.
>> >
>> > On the other hand, we should be more careful about consistency with
>> other
>> > systems.
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Tue, Feb 2, 2021 at 11:24 AM Jark Wu  wrote:
>> >
>> >> Hi Leonard, Timo,
>> >>
>> >> I just did some investigation and found all the other batch processing
>> >> systems
>> >>   evaluate the time functions at query-start, including Snowflake,
>> Hive,
>> >> Spark, Trino.
>> >> I'm wondering whether the default 'per-record' mode will still be
>> weird for
>> >> batch users.
>> >> I know we proposed the option for batch users to change the behavior.
>> >> However if 90% users need to set this config before submitting batch
>> jobs,
>> >> why not
>> >> use this mode for batch by default? For the other 10% special users,
>> they
>> >> can still
>> >> set the config to per-record before submitting batch jobs. I believe
>> this
>> >> can greatly
>> >> improve the usability for batch cases.
>> >>
>> >> Therefore, what do you think about using "auto" as the default option
>> >> value?
>> >>
>> >> It evaluates time functions per-record in streaming mode and evaluates
>> at
>> >> query start in batch mode.
>> >> I think this can make both streaming users and batch users happy.
>> IIUC, the
>> >> reason why we
>> >> proposing the default "per-record" mode is for the batch streaming
>> >> consistent.
>> >> However, I think time functions are special cases because they are
>> >> naturally non-deterministic.
>> >> Even if streaming jobs and batch jobs all use "per-record" mode, 

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Kurt Young
Hi Timo,

I don't think batch-stream unification can deal with all the cases,
especially if
the query involves some non deterministic functions.

No matter we choose any options, these queries will have different results.
For example, if we run the same query in batch mode multiple times, it's
also
highly possible that we get different results. Does that mean all the
database
vendors can't deliver batch-batch unification? I don't think so.

What's really important here is the user's intuition. What do users expect
if
they don't read any documents about these functions. For batch users, I
think
it's already clear enough that all other systems and databases will
evaluate
these functions during query start. And for streaming users, I have already
seen
some users are expecting these functions to be calculated per record.

Thus I think we can make the behavior determined together with execution
mode.
One exception would be PROCTIME(), I think all users would expect this
function
will be calculated for each record. I think SYS_CURRENT_TIMESTAMP is similar
to PROCTIME(), so we don't have to introduce it.

Best,
Kurt


On Tue, Feb 2, 2021 at 4:20 PM Timo Walther  wrote:

> Hi everyone,
>
> I'm not sure if we should introduce the `auto` mode. Taking all the
> previous discussions around batch-stream unification into account, batch
> mode and streaming mode should only influence the runtime efficiency and
> incremental computation. The final query result should be the same in
> both modes. Also looking into the long-term future, we might drop the
> mode property and either derive the mode or use different modes for
> parts of the pipeline.
>
> "I think we may need to think more from the users' perspective."
>
> I agree here and that's why I actually would like to let the user decide
> which semantics are needed. The config option proposal was my least
> favored alternative. We should stick to the standard and bahavior of
> other systems. For both batch and streaming. And use a simple prefix to
> let users decide whether the semantics are per-record or per-query:
>
> CURRENT_TIMESTAMP   -- semantics as all other vendors
>
>
> _CURRENT_TIMESTAMP  -- semantics per record
>
> OR
>
> SYS_CURRENT_TIMESTAMP  -- semantics per record
>
>
> Please check how other vendors are handling this:
>
> SYSDATE  MySql, Oracle
> SYSDATETIME  SQL Server
>
>
> Regards,
> Timo
>
>
> On 02.02.21 07:02, Jingsong Li wrote:
> > +1 for the default "auto" to the "table.exec.time-function-evaluation".
> >
> >>From the definition of these functions, in my opinion:
> > - Batch is the instant execution of all records, which is the meaning of
> > the word "BATCH", so there is only one time at query-start.
> > - Stream only executes a single record in a moment, so time is generated
> by
> > each record.
> >
> > On the other hand, we should be more careful about consistency with other
> > systems.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Feb 2, 2021 at 11:24 AM Jark Wu  wrote:
> >
> >> Hi Leonard, Timo,
> >>
> >> I just did some investigation and found all the other batch processing
> >> systems
> >>   evaluate the time functions at query-start, including Snowflake, Hive,
> >> Spark, Trino.
> >> I'm wondering whether the default 'per-record' mode will still be weird
> for
> >> batch users.
> >> I know we proposed the option for batch users to change the behavior.
> >> However if 90% users need to set this config before submitting batch
> jobs,
> >> why not
> >> use this mode for batch by default? For the other 10% special users,
> they
> >> can still
> >> set the config to per-record before submitting batch jobs. I believe
> this
> >> can greatly
> >> improve the usability for batch cases.
> >>
> >> Therefore, what do you think about using "auto" as the default option
> >> value?
> >>
> >> It evaluates time functions per-record in streaming mode and evaluates
> at
> >> query start in batch mode.
> >> I think this can make both streaming users and batch users happy. IIUC,
> the
> >> reason why we
> >> proposing the default "per-record" mode is for the batch streaming
> >> consistent.
> >> However, I think time functions are special cases because they are
> >> naturally non-deterministic.
> >> Even if streaming jobs and batch jobs all use "per-record" mode, they
> still
> >> can't provide consistent
> >> results. Thus, I think we may need to think more from the users'
> >> perspective.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 1 Feb 2021 at 23:06, Timo Walther  wrote:
> >>
> >>> Hi Leonard,
> >>>
> >>> thanks for considering this issue as well. +1 for the proposed config
> >>> option. Let's start a voting thread once the FLIP document has been
> >>> updated if there are no other concerns?
> >>>
> >>> Thanks,
> >>> Timo
> >>>
> >>>
> >>> On 01.02.21 15:07, Leonard Xu wrote:
>  Hi, all
> 
>  I’ve discussed with @Timo @Jark about the time function evaluation
> >>> further. We reach a consensus that we’d better 

Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-02 Thread Shengkai Fang
Hi, Jingsong.

Thanks for your reply. I think `UNSET` is much better.

1. We don't need to introduce another command `UNSET`. `RESET` is supported
in the current sql client now. Our proposal just extends its grammar and
allow users to reset the specified keys.
2. Hive beeline also uses `RESET` to set the key to the default value[1]. I
think it is more friendly for batch users.

Best,
Shengkai

[1] https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients

Jingsong Li  于2021年2月2日周二 下午1:56写道:

> Thanks for the proposal, yes, sql-client is too outdated. +1 for improving
> it.
>
> About "SET"  and "RESET", Why not be "SET" and "UNSET"?
>
> Best,
> Jingsong
>
> On Mon, Feb 1, 2021 at 2:46 PM Rui Li  wrote:
>
>> Thanks Shengkai for the update! The proposed changes look good to me.
>>
>> On Fri, Jan 29, 2021 at 8:26 PM Shengkai Fang  wrote:
>>
>> > Hi, Rui.
>> > You are right. I have already modified the FLIP.
>> >
>> > The main changes:
>> >
>> > # -f parameter has no restriction about the statement type.
>> > Sometimes, users use the pipe to redirect the result of queries to debug
>> > when submitting job by -f parameter. It's much convenient comparing to
>> > writing INSERT INTO statements.
>> >
>> > # Add a new sql client option `sql-client.job.detach` .
>> > Users prefer to execute jobs one by one in the batch mode. Users can set
>> > this option false and the client will process the next job until the
>> > current job finishes. The default value of this option is false, which
>> > means the client will execute the next job when the current job is
>> > submitted.
>> >
>> > Best,
>> > Shengkai
>> >
>> >
>> >
>> > Rui Li  于2021年1月29日周五 下午4:52写道:
>> >
>> >> Hi Shengkai,
>> >>
>> >> Regarding #2, maybe the -f options in flink and hive have different
>> >> implications, and we should clarify the behavior. For example, if the
>> >> client just submits the job and exits, what happens if the file
>> contains
>> >> two INSERT statements? I don't think we should treat them as a
>> statement
>> >> set, because users should explicitly write BEGIN STATEMENT SET in that
>> >> case. And the client shouldn't asynchronously submit the two jobs,
>> because
>> >> the 2nd may depend on the 1st, right?
>> >>
>> >> On Fri, Jan 29, 2021 at 4:30 PM Shengkai Fang 
>> wrote:
>> >>
>> >>> Hi Rui,
>> >>> Thanks for your feedback. I agree with your suggestions.
>> >>>
>> >>> For the suggestion 1: Yes. we are plan to strengthen the set command.
>> In
>> >>> the implementation, it will just put the key-value into the
>> >>> `Configuration`, which will be used to generate the table config. If
>> hive
>> >>> supports to read the setting from the table config, users are able to
>> set
>> >>> the hive-related settings.
>> >>>
>> >>> For the suggestion 2: The -f parameter will submit the job and exit.
>> If
>> >>> the queries never end, users have to cancel the job by themselves,
>> which is
>> >>> not reliable(people may forget their jobs). In most case, queries are
>> used
>> >>> to analyze the data. Users should use queries in the interactive mode.
>> >>>
>> >>> Best,
>> >>> Shengkai
>> >>>
>> >>> Rui Li  于2021年1月29日周五 下午3:18写道:
>> >>>
>>  Thanks Shengkai for bringing up this discussion. I think it covers a
>>  lot of useful features which will dramatically improve the usability
>> of our
>>  SQL Client. I have two questions regarding the FLIP.
>> 
>>  1. Do you think we can let users set arbitrary configurations via the
>>  SET command? A connector may have its own configurations and we
>> don't have
>>  a way to dynamically change such configurations in SQL Client. For
>> example,
>>  users may want to be able to change hive conf when using hive
>> connector [1].
>>  2. Any reason why we have to forbid queries in SQL files specified
>> with
>>  the -f option? Hive supports a similar -f option but allows queries
>> in the
>>  file. And a common use case is to run some query and redirect the
>> results
>>  to a file. So I think maybe flink users would like to do the same,
>>  especially in batch scenarios.
>> 
>>  [1] https://issues.apache.org/jira/browse/FLINK-20590
>> 
>>  On Fri, Jan 29, 2021 at 10:46 AM Sebastian Liu <
>> liuyang0...@gmail.com>
>>  wrote:
>> 
>> > Hi Shengkai,
>> >
>> > Glad to see this improvement. And I have some additional
>> suggestions:
>> >
>> > #1. Unify the TableEnvironment in ExecutionContext to
>> > StreamTableEnvironment for both streaming and batch sql.
>> > #2. Improve the way of results retrieval: sql client collect the
>> > results
>> > locally all at once using accumulators at present,
>> >   which may have memory issues in JM or Local for the big query
>> > result.
>> > Accumulator is only suitable for testing purpose.
>> >   We may change to use SelectTableSink, which is based
>> > on CollectSinkOperatorCoordinator.
>> > #3. Do we need to consider 

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-02 Thread Timo Walther

Hi everyone,

I'm not sure if we should introduce the `auto` mode. Taking all the 
previous discussions around batch-stream unification into account, batch 
mode and streaming mode should only influence the runtime efficiency and 
incremental computation. The final query result should be the same in 
both modes. Also looking into the long-term future, we might drop the 
mode property and either derive the mode or use different modes for 
parts of the pipeline.


"I think we may need to think more from the users' perspective."

I agree here and that's why I actually would like to let the user decide 
which semantics are needed. The config option proposal was my least 
favored alternative. We should stick to the standard and bahavior of 
other systems. For both batch and streaming. And use a simple prefix to 
let users decide whether the semantics are per-record or per-query:


CURRENT_TIMESTAMP   -- semantics as all other vendors


_CURRENT_TIMESTAMP  -- semantics per record

OR

SYS_CURRENT_TIMESTAMP  -- semantics per record


Please check how other vendors are handling this:

SYSDATE  MySql, Oracle
SYSDATETIME  SQL Server


Regards,
Timo


On 02.02.21 07:02, Jingsong Li wrote:

+1 for the default "auto" to the "table.exec.time-function-evaluation".


From the definition of these functions, in my opinion:

- Batch is the instant execution of all records, which is the meaning of
the word "BATCH", so there is only one time at query-start.
- Stream only executes a single record in a moment, so time is generated by
each record.

On the other hand, we should be more careful about consistency with other
systems.

Best,
Jingsong

On Tue, Feb 2, 2021 at 11:24 AM Jark Wu  wrote:


Hi Leonard, Timo,

I just did some investigation and found all the other batch processing
systems
  evaluate the time functions at query-start, including Snowflake, Hive,
Spark, Trino.
I'm wondering whether the default 'per-record' mode will still be weird for
batch users.
I know we proposed the option for batch users to change the behavior.
However if 90% users need to set this config before submitting batch jobs,
why not
use this mode for batch by default? For the other 10% special users, they
can still
set the config to per-record before submitting batch jobs. I believe this
can greatly
improve the usability for batch cases.

Therefore, what do you think about using "auto" as the default option
value?

It evaluates time functions per-record in streaming mode and evaluates at
query start in batch mode.
I think this can make both streaming users and batch users happy. IIUC, the
reason why we
proposing the default "per-record" mode is for the batch streaming
consistent.
However, I think time functions are special cases because they are
naturally non-deterministic.
Even if streaming jobs and batch jobs all use "per-record" mode, they still
can't provide consistent
results. Thus, I think we may need to think more from the users'
perspective.

Best,
Jark


On Mon, 1 Feb 2021 at 23:06, Timo Walther  wrote:


Hi Leonard,

thanks for considering this issue as well. +1 for the proposed config
option. Let's start a voting thread once the FLIP document has been
updated if there are no other concerns?

Thanks,
Timo


On 01.02.21 15:07, Leonard Xu wrote:

Hi, all

I’ve discussed with @Timo @Jark about the time function evaluation

further. We reach a consensus that we’d better address the time function
evaluation(function value materialization) in this FLIP as well.


We’re fine with introducing an option

table.exec.time-function-evaluation to control the materialize time point
of time function value. The time function includes

LOCALTIME
LOCALTIMESTAMP
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
NOW()
The default value of table.exec.time-function-evaluation is

'per-record', which means Flink evaluates the function value per record,

we

recommend users config this option value for their streaming pipe lines.

Another valid option value is ’query-start’, which means Flink

evaluates

the function value at the query start, we recommend users config this
option value for their batch pipelines.

In the future, more valid evaluation option value like ‘auto' may be

supported if there’re new requirements, e.g: support ‘auto’ option which
evaluates time function value per-record in streaming mode and evaluates

time function value at query start in batch mode.

Alternative1:
   Introduce function like CURRENT_TIMESTAMP2/CURRENT_TIMESTAMP_NOW

which evaluates function value at query start. This may confuse users a

bit

that we provide two similar functions but with different return value.



Alternative2:
 Do not introduce any configuration/function, control the

function evaluation by pipeline execution mode. This may produce

different

result when user use their  streaming pipeline sql to run a batch
pipeline(e.g backfilling), and user also

can not control these function behavior.


How do you think ?

Thanks,