Re: [DISCUSS] FLIP-27: Refactor Source Interface

2020-01-15 Thread Becket Qin
Hi Steven,

Unfortunately we were behind schedule and did not get this into 1.10... So
it will be in 1.11 instead.

Thanks,

Jiangjie (Becket) Qin

On Thu, Jan 16, 2020 at 10:39 AM Steven Wu  wrote:

> Becket, is FLIP-27 still on track to be released in 1.10?
>
> On Tue, Jan 7, 2020 at 7:04 PM Becket Qin  wrote:
>
> > Hi folks,
> >
> > Happy new year!
> >
> > Stephan and I chatted offline yesterday. After reading the email thread
> > again, I found that I have misunderstood Dawid's original proposal
> > regarding the behavior of env.source(BoundedSource) and had an incorrect
> > impression about the behavior of java covariant return type.
> > Anyways, I agree what Dawid originally proposed makes sense, which is the
> > following API:
> >
> > // Return a BoundedDataStream instance if the source is bounded.
> > // Return a DataStream instance if the source is unbounded.
> > DataStream env.source(Source);
> >
> > // Throws exception if the source is unbounded.
> > // Used when users knows the source is bounded at programming time.
> > BoundedDataStream env.boundedSource(Source);
> >
> > A BoundedDataStream only runs in batch execution mode.
> > A DataStream only runs in streaming execution mode.
> >
> > To run a bounded source in streaming execution mode, one would do the
> > following:
> >
> > // Return a DataStream instance with a source that will stop at some
> point;
> > DataStream env.source(SourceUtils.asUnbounded(myBoundedSource));
> >
> > I'll update the FLIP wiki and resume the vote if there is no further
> > concerns.
> >
> > Apologies for the misunderstanding and thanks for all the patient
> > discussions.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Dec 23, 2019 at 8:00 AM Becket Qin  wrote:
> >
> > > Hi Steven,
> > >
> > > I think the current proposal is what you mentioned - a Kafka source
> that
> > > can be constructed in either BOUNDED or UNBOUNDED mode. And Flink can
> get
> > > the boundedness by invoking getBoundedness().
> > >
> > > So one can create a Kafka source by doing something like the following:
> > >
> > > new KafkaSource().startOffset(),endOffset(); // A bounded instance.
> > > new KafkaSource().startOffset(); // An unbounded instance.
> > >
> > > If users want to have an UNBOUNDED Kafka source that stops at some
> point.
> > > They can wrap the BOUNDED Kafka source like below:
> > >
> > > SourceUtils.asUnbounded(new KafkaSource.startOffset().endOffset());
> > >
> > > The wrapped source would be an unbounded Kafka source that stops at the
> > > end offset.
> > >
> > > Does that make sense?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Dec 20, 2019 at 1:31 PM Jark Wu  wrote:
> > >
> > >> Hi,
> > >>
> > >> First of all, I think it is not called "UNBOUNDED", according to the
> > >> FLIP-27, it is called "CONTINUOUS_UNBOUNDED".
> > >> And from the description of the Boundedness in the FLIP-27[1] declares
> > >> clearly what Becket and I think.
> > >>
> > >> public enum Boundedness {
> > >>
> > >> /**
> > >>  * A bounded source processes the data that is currently available
> > and
> > >> will end after that.
> > >>  *
> > >>  * When a source produces a bounded stream, the runtime may
> > >> activate
> > >> additional optimizations
> > >>  * that are suitable only for bounded input. Incorrectly producing
> > >> unbounded data when the source
> > >>  * is set to produce a bounded stream will often result in
> programs
> > >> that do not output any results
> > >>  * and may eventually fail due to runtime errors (out of memory or
> > >> storage).
> > >>  */
> > >> BOUNDED,
> > >>
> > >> /**
> > >>  * A continuous unbounded source continuously processes all data
> as
> > it
> > >> comes.
> > >>  *
> > >>  * The source may run forever (until the program is terminated)
> > or
> > >> might actually end at some point,
> > >>  * based on some source-specific conditions. Because that is not
> > >> transparent to the runtime,
> > >>  * the runtime will use an execution mode for continuous unbounded
> > >> streams whenever this mode
> > >>  * is chosen.
> > >>  */
> > >> CONTINUOUS_UNBOUNDED
> > >> }
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> [1]:
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP-27:RefactorSourceInterface-Source
> > >>
> > >>
> > >>
> > >> On Fri, 20 Dec 2019 at 12:55, Steven Wu  wrote:
> > >>
> > >> > Becket,
> > >> >
> > >> > Regarding "UNBOUNDED source that stops at some point", I found it
> > >> difficult
> > >> > to grasp what UNBOUNDED really mean.
> > >> >
> > >> > If we want to use Kafka source with an end/stop time, I guess you
> call
> > >> it
> > >> > UNBOUNDED kafka source that stops (aka BOUNDED-streaming). The
> > >> > terminology is a little confusing to me. Maybe BOUNDED/UNBOUNDED
> > >> shouldn't
> > >> > be used to categorize source. Just call it Kafka source and it can
> 

[jira] [Created] (FLINK-15611) KafkaITCase.testOneToOneSources fails on Travis

2020-01-15 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-15611:
--

 Summary: KafkaITCase.testOneToOneSources fails on Travis
 Key: FLINK-15611
 URL: https://issues.apache.org/jira/browse/FLINK-15611
 Project: Flink
  Issue Type: Bug
Reporter: Yangze Guo
 Fix For: 1.10.0


{{The test KafkaITCase.testOneToOneSources failed on Travis.}}
{code:java}
03:15:02,019 INFO  
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
Deleting topic scale-down-before-first-checkpoint
03:15:02,037 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 

Test 
testScaleDownBeforeFirstCheckpoint(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
 successfully run.

03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase
 - -
03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase
 - Shut down KafkaTestBase 
03:15:02,038 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase
 - -
03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase
 - -
03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase
 - KafkaTestBase finished
03:15:25,728 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase
 - -
03:15:25.731 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time 
elapsed: 245.845 s - in 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
03:15:26.099 [INFO] 
03:15:26.099 [INFO] Results:
03:15:26.099 [INFO] 
03:15:26.099 [ERROR] Failures: 
03:15:26.099 [ERROR]   
KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:862
 Test failed: Job execution failed.
{code}
https://api.travis-ci.com/v3/job/276124537/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15610) How to achieve the udf that the number of return column is uncertain

2020-01-15 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15610:
-

 Summary: How to achieve the udf that the number of return column 
is uncertain 
 Key: FLINK-15610
 URL: https://issues.apache.org/jira/browse/FLINK-15610
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread Benchao Li
Hi LakeShen,

Maybe "Idle State Retention Time"[1] may help in your case.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

LakeShen  于2020年1月16日周四 上午10:15写道:

> Hi community,now I am use flink sql inner join in my code,I saw the flink
> document, the flink sql inner join will keep both sides of the join input
> in Flink’s state forever.
> As result , the hdfs files size are so big , is there any way to clear the
> sql join state?
> Thanks to your reply.
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


[jira] [Created] (FLINK-15609) Add blink built-in functions from FlinkSqlOperatorTable to BuiltInFunctionDefinitions

2020-01-15 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15609:


 Summary: Add blink built-in functions from FlinkSqlOperatorTable 
to BuiltInFunctionDefinitions
 Key: FLINK-15609
 URL: https://issues.apache.org/jira/browse/FLINK-15609
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Jingsong Lee
 Fix For: 1.10.0


In FLINK-15595, CoreModule should contains all functions in 
FlinkSqlOperatorTable. Otherwise, resolution Order is chaotic. I think it is 
time to align blink built-in functions to BuiltInFunctionDefinitions.

Impact to legacy planner: user can not use the function name directly that he 
define function with the same name of blink built-in function in catalog. I 
think it is reasonable, since he will migrate his job to blink planner.

What do you think? [~twalthr] [~dwysakowicz]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15608) State Processor api Write new state will get NullPointerException

2020-01-15 Thread HunterHunter (Jira)
HunterHunter created FLINK-15608:


 Summary:  State Processor api Write new state will get 
NullPointerException 
 Key: FLINK-15608
 URL: https://issues.apache.org/jira/browse/FLINK-15608
 Project: Flink
  Issue Type: Bug
Reporter: HunterHunter


When I execute a simple sample program. Sometimes report this mistake  
(occasionally): 

Assertion failed: (last_ref), function ~ColumnFamilySet, file 
db/column_family.cc, line 1238.

And When I Write new State to a new SavepointPath I will get  ( v1.9.0 is 
fine): 

Caused by: java.lang.NullPointerExceptionCaused by: 
java.lang.NullPointerException at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at 
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
 at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Make AppendingState#add refuse to add null element

2020-01-15 Thread Yun Tang
+1 for unifying the behavior of AppendingState#add .

However, I have concern for the usage of window reducing function [1], I'm not 
sure whether user would rely on processing StreamRecord(null) to clear state. 
As you can see, user could not see the reducing window state directly, and the 
only way to communicate with state is via processing records.

I'm not sure whether this is by design, @Aljoscha 
Krettek  would you please share the initial idea 
when introducing this for the first time?


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#reducefunction

Best
Yun Tang


From: Yu Li 
Sent: Thursday, January 9, 2020 14:09
To: dev 
Subject: Re: [DISCUSS] Make AppendingState#add refuse to add null element

+1 for unifying the behavior to refusing adding null element. Nice catch
and thanks for bringing up the discussion!

Best Regards,
Yu


On Wed, 8 Jan 2020 at 22:50, Aljoscha Krettek  wrote:

> Hi,
>
> As I said in the discussion on the Jira issue, I’m in favour of this
> change!
>
> This is the Jira Issue, for reference:
> https://issues.apache.org/jira/browse/FLINK-15424
>
> Best,
> Aljoscha
>
> > On 8. Jan 2020, at 15:16, Congxian Qiu  wrote:
> >
> > Dear All
> >
> >
> > Currently, we found the implementations of AppendingState#add are not the
> > same, taking some as example:
> >
> >   - HeapReducingState will clear state if add null element
> >   - RocksDBReducingState will add null element if serializer can
> serialize
> >   null
> >   - Both HeapListState and RocksDBListState refuse to add null element —
> >   will throw NullPointException
> >
> >
> > we think this need to be fixed, and possible solutions include:
> >
> >   1. Respect the current java doc, which said “If null is passed in, the
> >   state value will remain unchanged”
> >   2. Make all AppendingState#add refuse to add null element
> >
> >
> > We propose to apply the second solution, following the recommendation in
> > Guava[1].
> >
> >
> > Would love to hear your thoughts. Thanks.
> >
> >
> > Regards,
> >
> > Congxian
> >
> >
> > [1] https://github.com/google/guava/wiki/UsingAndAvoidingNullExplained
>
>


Re: Please give me the permission as a contributor

2020-01-15 Thread Congxian Qiu
Welcome to the Flink community!
You no longer need contributor permissions to open JIRA tickets. You can
simply open a JIRA ticket and ask a committer to assign you to it and start
working on it.
Please check the Flink's contribution guidelines [1] for more details.

[1] https://flink.apache.org/contributing/how-to-contribute.html

Best,
Congxian


m...@lishiyu.cn  于2020年1月15日周三 下午5:12写道:

> Hi Guys,
>
> I want to contribute to Apache Flink.
> Would you please give me the permission as a contributor?
> My JIRA ID is lishiyu.
>
>
> m...@lishiyu.cn
>


Re: [DISCUSS] FLIP-85: Delayed Job Graph Generation

2020-01-15 Thread Yang Wang
Hi all,

Thanks a lot for the feedback from @Kostas Kloudas. Your all concerns are
on point. The FLIP-85 is mainly
focused on supporting cluster mode for per-job. Since it is more urgent and
have much more use
cases both in Yarn and Kubernetes deployment. For session cluster, we could
have more discussion
in a new thread later.

#1, How to download the user jars and dependencies for per-job in cluster
mode?
For Yarn, we could register the user jars and dependencies as
LocalResource. They will be distributed
by Yarn. And once the JobManager and TaskManager launched, the jars are
already exists.
For Standalone per-job and K8s, we expect that the user jars
and dependencies are built into the image.
Or the InitContainer could be used for downloading. It is natively
distributed and we will not have bottleneck.

#2, Job graph recovery
We could have an optimization to store job graph on the DFS. However, i
suggest building a new jobgraph
from the configuration is the default option. Since we will not always have
a DFS store when deploying a
Flink per-job cluster. Of course, we assume that using the same
configuration(e.g. job_id, user_jar, main_class,
main_args, parallelism, savepoint_settings, etc.) will get a same job
graph. I think the standalone per-job
already has the similar behavior.

#3, What happens with jobs that have multiple execute calls?
Currently, it is really a problem. Even we use a local client on Flink
master side, it will have different behavior with
client mode. For client mode, if we execute multiple times, then we will
deploy multiple Flink clusters for each execute.
I am not pretty sure whether it is reasonable. However, i still think using
the local client is a good choice. We could
continue the discussion in a new thread. @Zili Chen  Do
you want to drive this?



Best,
Yang

Peter Huang  于2020年1月16日周四 上午1:55写道:

> Hi Kostas,
>
> Thanks for this feedback. I can't agree more about the opinion. The
> cluster mode should be added
> first in per job cluster.
>
> 1) For job cluster implementation
> 1. Job graph recovery from configuration or store as static job graph as
> session cluster. I think the static one will be better for less recovery
> time.
> Let me update the doc for details.
>
> 2. For job execute multiple times, I think @Zili Chen
>  has proposed the local client solution that can
> the run program actually in the cluster entry point. We can put the
> implementation in the second stage,
> or even a new FLIP for further discussion.
>
> 2) For session cluster implementation
> We can disable the cluster mode for the session cluster in the first
> stage. I agree the jar downloading will be a painful thing.
> We can consider about PoC and performance evaluation first. If the end to
> end experience is good enough, then we can consider
> proceeding with the solution.
>
> Looking forward to more opinions from @Yang Wang  @Zili
> Chen  @Dian Fu .
>
>
> Best Regards
> Peter Huang
>
> On Wed, Jan 15, 2020 at 7:50 AM Kostas Kloudas  wrote:
>
>> Hi all,
>>
>> I am writing here as the discussion on the Google Doc seems to be a
>> bit difficult to follow.
>>
>> I think that in order to be able to make progress, it would be helpful
>> to focus on per-job mode for now.
>> The reason is that:
>>  1) making the (unique) JobSubmitHandler responsible for creating the
>> jobgraphs,
>>   which includes downloading dependencies, is not an optimal solution
>>  2) even if we put the responsibility on the JobMaster, currently each
>> job has its own
>>   JobMaster but they all run on the same process, so we have again a
>> single entity.
>>
>> Of course after this is done, and if we feel comfortable with the
>> solution, then we can go to the session mode.
>>
>> A second comment has to do with fault-tolerance in the per-job,
>> cluster-deploy mode.
>> In the document, it is suggested that upon recovery, the JobMaster of
>> each job re-creates the JobGraph.
>> I am just wondering if it is better to create and store the jobGraph
>> upon submission and only fetch it
>> upon recovery so that we have a static jobGraph.
>>
>> Finally, I have a question which is what happens with jobs that have
>> multiple execute calls?
>> The semantics seem to change compared to the current behaviour, right?
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Jan 8, 2020 at 8:05 PM tison  wrote:
>> >
>> > not always, Yang Wang is also not yet a committer but he can join the
>> > channel. I cannot find the id by clicking “Add new member in channel” so
>> > come to you and ask for try out the link. Possibly I will find other
>> ways
>> > but the original purpose is that the slack channel is a public area we
>> > discuss about developing...
>> > Best,
>> > tison.
>> >
>> >
>> > Peter Huang  于2020年1月9日周四 上午2:44写道:
>> >
>> > > Hi Tison,
>> > >
>> > > I am not the committer of Flink yet. I think I can't join it also.
>> > >
>> > >
>> > > Best Regards
>> > > Peter Huang
>> > >
>> > > On Wed, Jan 8, 2020 at 9:39 AM tison  wrote:
>> > >
>> > > > Hi 

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2020-01-15 Thread Steven Wu
Becket, is FLIP-27 still on track to be released in 1.10?

On Tue, Jan 7, 2020 at 7:04 PM Becket Qin  wrote:

> Hi folks,
>
> Happy new year!
>
> Stephan and I chatted offline yesterday. After reading the email thread
> again, I found that I have misunderstood Dawid's original proposal
> regarding the behavior of env.source(BoundedSource) and had an incorrect
> impression about the behavior of java covariant return type.
> Anyways, I agree what Dawid originally proposed makes sense, which is the
> following API:
>
> // Return a BoundedDataStream instance if the source is bounded.
> // Return a DataStream instance if the source is unbounded.
> DataStream env.source(Source);
>
> // Throws exception if the source is unbounded.
> // Used when users knows the source is bounded at programming time.
> BoundedDataStream env.boundedSource(Source);
>
> A BoundedDataStream only runs in batch execution mode.
> A DataStream only runs in streaming execution mode.
>
> To run a bounded source in streaming execution mode, one would do the
> following:
>
> // Return a DataStream instance with a source that will stop at some point;
> DataStream env.source(SourceUtils.asUnbounded(myBoundedSource));
>
> I'll update the FLIP wiki and resume the vote if there is no further
> concerns.
>
> Apologies for the misunderstanding and thanks for all the patient
> discussions.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Dec 23, 2019 at 8:00 AM Becket Qin  wrote:
>
> > Hi Steven,
> >
> > I think the current proposal is what you mentioned - a Kafka source that
> > can be constructed in either BOUNDED or UNBOUNDED mode. And Flink can get
> > the boundedness by invoking getBoundedness().
> >
> > So one can create a Kafka source by doing something like the following:
> >
> > new KafkaSource().startOffset(),endOffset(); // A bounded instance.
> > new KafkaSource().startOffset(); // An unbounded instance.
> >
> > If users want to have an UNBOUNDED Kafka source that stops at some point.
> > They can wrap the BOUNDED Kafka source like below:
> >
> > SourceUtils.asUnbounded(new KafkaSource.startOffset().endOffset());
> >
> > The wrapped source would be an unbounded Kafka source that stops at the
> > end offset.
> >
> > Does that make sense?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Dec 20, 2019 at 1:31 PM Jark Wu  wrote:
> >
> >> Hi,
> >>
> >> First of all, I think it is not called "UNBOUNDED", according to the
> >> FLIP-27, it is called "CONTINUOUS_UNBOUNDED".
> >> And from the description of the Boundedness in the FLIP-27[1] declares
> >> clearly what Becket and I think.
> >>
> >> public enum Boundedness {
> >>
> >> /**
> >>  * A bounded source processes the data that is currently available
> and
> >> will end after that.
> >>  *
> >>  * When a source produces a bounded stream, the runtime may
> >> activate
> >> additional optimizations
> >>  * that are suitable only for bounded input. Incorrectly producing
> >> unbounded data when the source
> >>  * is set to produce a bounded stream will often result in programs
> >> that do not output any results
> >>  * and may eventually fail due to runtime errors (out of memory or
> >> storage).
> >>  */
> >> BOUNDED,
> >>
> >> /**
> >>  * A continuous unbounded source continuously processes all data as
> it
> >> comes.
> >>  *
> >>  * The source may run forever (until the program is terminated)
> or
> >> might actually end at some point,
> >>  * based on some source-specific conditions. Because that is not
> >> transparent to the runtime,
> >>  * the runtime will use an execution mode for continuous unbounded
> >> streams whenever this mode
> >>  * is chosen.
> >>  */
> >> CONTINUOUS_UNBOUNDED
> >> }
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP-27:RefactorSourceInterface-Source
> >>
> >>
> >>
> >> On Fri, 20 Dec 2019 at 12:55, Steven Wu  wrote:
> >>
> >> > Becket,
> >> >
> >> > Regarding "UNBOUNDED source that stops at some point", I found it
> >> difficult
> >> > to grasp what UNBOUNDED really mean.
> >> >
> >> > If we want to use Kafka source with an end/stop time, I guess you call
> >> it
> >> > UNBOUNDED kafka source that stops (aka BOUNDED-streaming). The
> >> > terminology is a little confusing to me. Maybe BOUNDED/UNBOUNDED
> >> shouldn't
> >> > be used to categorize source. Just call it Kafka source and it can run
> >> in
> >> > either BOUNDED or UNBOUNDED mode.
> >> >
> >> > Thanks,
> >> > Steven
> >> >
> >> > On Thu, Dec 19, 2019 at 7:02 PM Becket Qin 
> >> wrote:
> >> >
> >> > > I had an offline chat with Jark, and here are some more thoughts:
> >> > >
> >> > > 1. From SQL perspective, BOUNDED source leads to the batch execution
> >> > mode,
> >> > > UNBOUNDED source leads to the streaming execution mode.
> >> > > 2. The semantic of UNBOUNDED source is may or may not stop. The
> >> semantic
> >> 

Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread LakeShen
Hi community,now I am use flink sql inner join in my code,I saw the flink
document, the flink sql inner join will keep both sides of the join input
in Flink’s state forever.
As result , the hdfs files size are so big , is there any way to clear the
sql join state?
Thanks to your reply.


Re: [DISCUSS] Improve TableFactory

2020-01-15 Thread Bowen Li
Hi Jingsong,

The 1st and 2nd pain points you described are very valid, as I'm more
familiar with them. I agree these are shortcomings of the current Flink SQL
design.

A couple comments on your 1st proposal:

1. is it better to have explicit APIs like "createBatchTableSource(...)"
and "createStreamingTableSource(...)" in TableSourceFactory (would be
similar for sink factory) to let planner handle which mode (streaming vs
batch) of source should be instantiated? That way we don't need to always
let connector developers handling an if-else on isStreamingMode.
2. I'm not sure of the benefits to have a CatalogTableContext class. The
path, table, and config are fairly independent of each other. So why not
pass the config in as 3rd parameter as `createXxxTableSource(path,
catalogTable, tableConfig)?


On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li  wrote:

> Hi dev,
>
> I'd like to kick off a discussion on the improvement of TableSourceFactory
> and TableSinkFactory.
>
> Motivation:
> Now the main needs and problems are:
> 1.Connector can't get TableConfig [1], and some behaviors really need to be
> controlled by the user's table configuration. In the era of catalog, we
> can't put these config in connector properties, which is too inconvenient.
> 2.Connector can't know if this is batch or stream execution mode. But the
> sink implementation of batch and stream is totally different. I understand
> there is an update mode property now, but it splits the batch and stream in
> the catalog dimension. In fact, this information can be obtained through
> the current TableEnvironment.
> 3.No interface to call validation. Now our validation is more util classes.
> It depends on whether or not the connector calls. Now we have some new
> validations to add, such as [2], which is really confuse uses, even
> developers. Another problem is that our SQL update (DDL) does not have
> validation [3]. It is better to report an error when executing DDL,
> otherwise it will confuse the user.
>
> Proposed change draft for 1 and 2:
>
> interface CatalogTableContext {
>ObjectPath getTablePath();
>CatalogTable getTable();
>ReadableConfig getTableConfig();
>boolean isStreamingMode();
> }
>
> public interface TableSourceFactory extends TableFactory {
>
>default TableSource createTableSource(CatalogTableContext context) {
>   return createTableSource(context.getTablePath(), context.getTable());
>}
>
>..
> }
>
> Proposed change draft for 3:
>
> public interface TableFactory {
>
>TableValidators validators();
>
>interface TableValidators {
>   ConnectorDescriptorValidator connectorValidator();
>   TableSchemaValidator schemaValidator();
>   FormatDescriptorValidator formatValidator();
>}
> }
>
> What do you think?
>
> [1] https://issues.apache.org/jira/browse/FLINK-15290
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> [3] https://issues.apache.org/jira/browse/FLINK-15509
>
> Best,
> Jingsong Lee
>


[jira] [Created] (FLINK-15607) throw exception when users trying to use Hive aggregate functions in streaming mode

2020-01-15 Thread Bowen Li (Jira)
Bowen Li created FLINK-15607:


 Summary: throw exception when users trying to use Hive aggregate 
functions in streaming mode
 Key: FLINK-15607
 URL: https://issues.apache.org/jira/browse/FLINK-15607
 Project: Flink
  Issue Type: Task
  Components: Connectors / Hive, Table SQL / API
Reporter: Bowen Li


Seems need to distinguish execution mode in FunctionCatalogOperatorTable, which 
is not achievable yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-85: Delayed Job Graph Generation

2020-01-15 Thread Peter Huang
Hi Kostas,

Thanks for this feedback. I can't agree more about the opinion. The cluster
mode should be added
first in per job cluster.

1) For job cluster implementation
1. Job graph recovery from configuration or store as static job graph as
session cluster. I think the static one will be better for less recovery
time.
Let me update the doc for details.

2. For job execute multiple times, I think @Zili Chen
 has
proposed the local client solution that can
the run program actually in the cluster entry point. We can put the
implementation in the second stage,
or even a new FLIP for further discussion.

2) For session cluster implementation
We can disable the cluster mode for the session cluster in the first stage.
I agree the jar downloading will be a painful thing.
We can consider about PoC and performance evaluation first. If the end to
end experience is good enough, then we can consider
proceeding with the solution.

Looking forward to more opinions from @Yang Wang  @Zili
Chen  @Dian Fu .


Best Regards
Peter Huang

On Wed, Jan 15, 2020 at 7:50 AM Kostas Kloudas  wrote:

> Hi all,
>
> I am writing here as the discussion on the Google Doc seems to be a
> bit difficult to follow.
>
> I think that in order to be able to make progress, it would be helpful
> to focus on per-job mode for now.
> The reason is that:
>  1) making the (unique) JobSubmitHandler responsible for creating the
> jobgraphs,
>   which includes downloading dependencies, is not an optimal solution
>  2) even if we put the responsibility on the JobMaster, currently each
> job has its own
>   JobMaster but they all run on the same process, so we have again a
> single entity.
>
> Of course after this is done, and if we feel comfortable with the
> solution, then we can go to the session mode.
>
> A second comment has to do with fault-tolerance in the per-job,
> cluster-deploy mode.
> In the document, it is suggested that upon recovery, the JobMaster of
> each job re-creates the JobGraph.
> I am just wondering if it is better to create and store the jobGraph
> upon submission and only fetch it
> upon recovery so that we have a static jobGraph.
>
> Finally, I have a question which is what happens with jobs that have
> multiple execute calls?
> The semantics seem to change compared to the current behaviour, right?
>
> Cheers,
> Kostas
>
> On Wed, Jan 8, 2020 at 8:05 PM tison  wrote:
> >
> > not always, Yang Wang is also not yet a committer but he can join the
> > channel. I cannot find the id by clicking “Add new member in channel” so
> > come to you and ask for try out the link. Possibly I will find other ways
> > but the original purpose is that the slack channel is a public area we
> > discuss about developing...
> > Best,
> > tison.
> >
> >
> > Peter Huang  于2020年1月9日周四 上午2:44写道:
> >
> > > Hi Tison,
> > >
> > > I am not the committer of Flink yet. I think I can't join it also.
> > >
> > >
> > > Best Regards
> > > Peter Huang
> > >
> > > On Wed, Jan 8, 2020 at 9:39 AM tison  wrote:
> > >
> > > > Hi Peter,
> > > >
> > > > Could you try out this link?
> > > https://the-asf.slack.com/messages/CNA3ADZPH
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Peter Huang  于2020年1月9日周四 上午1:22写道:
> > > >
> > > > > Hi Tison,
> > > > >
> > > > > I can't join the group with shared link. Would you please add me
> into
> > > the
> > > > > group? My slack account is huangzhenqiu0825.
> > > > > Thank you in advance.
> > > > >
> > > > >
> > > > > Best Regards
> > > > > Peter Huang
> > > > >
> > > > > On Wed, Jan 8, 2020 at 12:02 AM tison 
> wrote:
> > > > >
> > > > > > Hi Peter,
> > > > > >
> > > > > > As described above, this effort should get attention from people
> > > > > developing
> > > > > > FLIP-73 a.k.a. Executor abstractions. I recommend you to join the
> > > > public
> > > > > > slack channel[1] for Flink Client API Enhancement and you can
> try to
> > > > > share
> > > > > > you detailed thoughts there. It possibly gets more concrete
> > > attentions.
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://slack.com/share/IS21SJ75H/Rk8HhUly9FuEHb7oGwBZ33uL/enQtODg2MDYwNjE5MTg3LTA2MjIzNDc1M2ZjZDVlMjdlZjk1M2RkYmJhNjAwMTk2ZDZkODQ4NmY5YmI4OGRhNWJkYTViMTM1NzlmMzc4OWM
> > > > > >
> > > > > >
> > > > > > Peter Huang  于2020年1月7日周二 上午5:09写道:
> > > > > >
> > > > > > > Dear All,
> > > > > > >
> > > > > > > Happy new year! According to existing feedback from the
> community,
> > > we
> > > > > > > revised the doc with the consideration of session cluster
> support,
> > > > and
> > > > > > > concrete interface changes needed and execution plan. Please
> take
> > > one
> > > > > > more
> > > > > > > round of review at your most convenient time.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit#
> > > > > > >
> > > > > > >
> > > > > > > Best Regards
> > > > > > > 

Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-15 Thread Till Rohrmann
I'd be fine with these changes. Thanks for the summary Xintong.

Cheers,
Till

On Wed, Jan 15, 2020 at 11:09 AM Xintong Song  wrote:

> Thank you all for the well discussion.
>
> If there's no further concerns or objections, I would like to conclude this
> thread into the following action items.
>
>- Change default value of "taskmanager.memory.jvm-overhead.min" to
> 192MB.
>- Change default value of "taskmanager.memory.jvm-metaspace.size" to
>96MB.
>- Change the value of "taskmanager.memory.process.size" in the default
>"flink-conf.yaml" to 1568MB.
>- Relax JVM overhead sanity check, so that the fraction does not need to
>be strictly followed, as long as the min/max range is respected.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 15, 2020 at 5:50 PM Xintong Song 
> wrote:
>
> > There's more idea from offline discussion with Andrey.
> >
> > If we decide to make metaspace 96MB, we can also make process.size 1568MB
> > (1.5G + 32MB).
> > According to the spreadsheet
> > <
> https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE/edit#gid=0
> >,
> > 1.5GB process size and 64MB metaspace result in memory sizes with the
> > values to be powers of 2.
> > When increasing the metaspace from 64MB to 96MB, it would be good to
> > preserve that alignment, for better readability that later we explain the
> > memory configuration and calculations in documents.
> > I believe it's not a big difference between 1.5GB and 1.5GB + 32 MB for
> > memory consumption.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Jan 15, 2020 at 11:55 AM Xintong Song 
> > wrote:
> >
> >> Thanks for the discussion, Stephan, Till and Andrey.
> >>
> >> +1 for the managed fraction (0.4) and process.size (1.5G).
> >>
> >> *JVM overhead min 196 -> 192Mb (128 + 64)*
> >>> small correction for better power 2 alignment of sizes
> >>>
> >> Sorry, this was a typo (and the same for the jira comment which is
> >> copy-pasted). It was 192mb used in the tuning report.
> >>
> >> *meta space at least 96Mb?*
> >>> There is still a concern about JVM metaspace being just 64Mb.
> >>> We should confirm that it is not a problem by trying to test it also
> with
> >>> the SQL jobs, Blink planner.
> >>> Also, by running tpc-ds e2e Flink tests with this setting. Basically,
> >>> where
> >>> more classes are generated/loaded.
> >>> We can look into this tomorrow.
> >>>
> >> I have already tried the setting metaspace to 64Mb with the e2e tests,
> >> where I believe various sql / blink / tpc-ds test cases are included.
> (See
> >> https://travis-ci.com/flink-ci/flink/builds/142970113 )
> >> However, I'm also ok with 96Mb, since we are increasing the process.size
> >> to 1.5G.
> >> My original concern for having larger metaspace size was that we may
> >> result in too small flink.size for out-of-box configuration on
> >> containerized setups.
> >>
> >> *sanity check of JVM overhead*
> >>> When the explicitly configured process and flink memory sizes are
> >>> verified
> >>> with the JVM meta space and overhead,
> >>> JVM overhead does not have to be the exact fraction.
> >>> It can be just within its min/max range, similar to how it is now for
> >>> network/shuffle memory check after FLINK-15300.
> >>>
> >> Also +1 for this.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Wed, Jan 15, 2020 at 6:16 AM Andrey Zagrebin 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> Stephan, Till and me had another offline discussion today. Here is the
> >>> outcome of our brainstorm.
> >>>
> >>> *managed fraction 0.4*
> >>> just confirmed what we already discussed here.
> >>>
> >>> *process.size = 1536Mb (1,5Gb)*
> >>> We agreed to have process.size in the default settings with the
> >>> explanation
> >>> of flink.size alternative in the comment.
> >>> The suggestion is to increase it from 1024 to 1536mb. As you can see in
> >>> the
> >>> earlier provided calculation spreadsheet,
> >>> it will result in bigger JVM Heap and managed memory (both ~0.5Gb) for
> >>> all
> >>> new setups.
> >>> This should provide good enough experience for trying out Flink.
> >>>
> >>> *JVM overhead min 196 -> 192Mb (128 + 64)*
> >>> small correction for better power 2 alignment of sizes
> >>>
> >>> *meta space at least 96Mb?*
> >>> There is still a concern about JVM metaspace being just 64Mb.
> >>> We should confirm that it is not a problem by trying to test it also
> with
> >>> the SQL jobs, Blink planner.
> >>> Also, by running tpc-ds e2e Flink tests with this setting. Basically,
> >>> where
> >>> more classes are generated/loaded.
> >>> We can look into this tomorrow.
> >>>
> >>> *sanity check of JVM overhead*
> >>> When the explicitly configured process and flink memory sizes are
> >>> verified
> >>> with the JVM meta space and overhead,
> >>> JVM overhead does not have to be the exact fraction.
> >>> It can be just within its min/max range, similar to how it is now for
> >>> network/shuffle memory check after 

[jira] [Created] (FLINK-15606) Deprecate enable default background cleanup of state with TTL

2020-01-15 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-15606:
---

 Summary: Deprecate enable default background cleanup of state with 
TTL
 Key: FLINK-15606
 URL: https://issues.apache.org/jira/browse/FLINK-15606
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin
 Fix For: 1.10.0


Follow-up for FLINK-14898.

Enabling TTL without any background cleanup does not make too much
 sense. So we can keep it always enabled, just cleanup settings can be
 tweaked for particular backends.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-85: Delayed Job Graph Generation

2020-01-15 Thread Kostas Kloudas
Hi all,

I am writing here as the discussion on the Google Doc seems to be a
bit difficult to follow.

I think that in order to be able to make progress, it would be helpful
to focus on per-job mode for now.
The reason is that:
 1) making the (unique) JobSubmitHandler responsible for creating the jobgraphs,
  which includes downloading dependencies, is not an optimal solution
 2) even if we put the responsibility on the JobMaster, currently each
job has its own
  JobMaster but they all run on the same process, so we have again a
single entity.

Of course after this is done, and if we feel comfortable with the
solution, then we can go to the session mode.

A second comment has to do with fault-tolerance in the per-job,
cluster-deploy mode.
In the document, it is suggested that upon recovery, the JobMaster of
each job re-creates the JobGraph.
I am just wondering if it is better to create and store the jobGraph
upon submission and only fetch it
upon recovery so that we have a static jobGraph.

Finally, I have a question which is what happens with jobs that have
multiple execute calls?
The semantics seem to change compared to the current behaviour, right?

Cheers,
Kostas

On Wed, Jan 8, 2020 at 8:05 PM tison  wrote:
>
> not always, Yang Wang is also not yet a committer but he can join the
> channel. I cannot find the id by clicking “Add new member in channel” so
> come to you and ask for try out the link. Possibly I will find other ways
> but the original purpose is that the slack channel is a public area we
> discuss about developing...
> Best,
> tison.
>
>
> Peter Huang  于2020年1月9日周四 上午2:44写道:
>
> > Hi Tison,
> >
> > I am not the committer of Flink yet. I think I can't join it also.
> >
> >
> > Best Regards
> > Peter Huang
> >
> > On Wed, Jan 8, 2020 at 9:39 AM tison  wrote:
> >
> > > Hi Peter,
> > >
> > > Could you try out this link?
> > https://the-asf.slack.com/messages/CNA3ADZPH
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Peter Huang  于2020年1月9日周四 上午1:22写道:
> > >
> > > > Hi Tison,
> > > >
> > > > I can't join the group with shared link. Would you please add me into
> > the
> > > > group? My slack account is huangzhenqiu0825.
> > > > Thank you in advance.
> > > >
> > > >
> > > > Best Regards
> > > > Peter Huang
> > > >
> > > > On Wed, Jan 8, 2020 at 12:02 AM tison  wrote:
> > > >
> > > > > Hi Peter,
> > > > >
> > > > > As described above, this effort should get attention from people
> > > > developing
> > > > > FLIP-73 a.k.a. Executor abstractions. I recommend you to join the
> > > public
> > > > > slack channel[1] for Flink Client API Enhancement and you can try to
> > > > share
> > > > > you detailed thoughts there. It possibly gets more concrete
> > attentions.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> > https://slack.com/share/IS21SJ75H/Rk8HhUly9FuEHb7oGwBZ33uL/enQtODg2MDYwNjE5MTg3LTA2MjIzNDc1M2ZjZDVlMjdlZjk1M2RkYmJhNjAwMTk2ZDZkODQ4NmY5YmI4OGRhNWJkYTViMTM1NzlmMzc4OWM
> > > > >
> > > > >
> > > > > Peter Huang  于2020年1月7日周二 上午5:09写道:
> > > > >
> > > > > > Dear All,
> > > > > >
> > > > > > Happy new year! According to existing feedback from the community,
> > we
> > > > > > revised the doc with the consideration of session cluster support,
> > > and
> > > > > > concrete interface changes needed and execution plan. Please take
> > one
> > > > > more
> > > > > > round of review at your most convenient time.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit#
> > > > > >
> > > > > >
> > > > > > Best Regards
> > > > > > Peter Huang
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Jan 2, 2020 at 11:29 AM Peter Huang <
> > > > huangzhenqiu0...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Dian,
> > > > > > > Thanks for giving us valuable feedbacks.
> > > > > > >
> > > > > > > 1) It's better to have a whole design for this feature
> > > > > > > For the suggestion of enabling the cluster mode also session
> > > > cluster, I
> > > > > > > think Flink already supported it. WebSubmissionExtension already
> > > > allows
> > > > > > > users to start a job with the specified jar by using web UI.
> > > > > > > But we need to enable the feature from CLI for both local jar,
> > > remote
> > > > > > jar.
> > > > > > > I will align with Yang Wang first about the details and update
> > the
> > > > > design
> > > > > > > doc.
> > > > > > >
> > > > > > > 2) It's better to consider the convenience for users, such as
> > > > debugging
> > > > > > >
> > > > > > > I am wondering whether we can store the exception in jobgragh
> > > > > > > generation in application master. As no streaming graph can be
> > > > > scheduled
> > > > > > in
> > > > > > > this case, there will be no more TM will be requested from
> > FlinkRM.
> > > > > > > If the AM is still running, users can still query it from CLI. As
> > > it
> > > > > > > 

[jira] [Created] (FLINK-15605) Remove deprecated in 1.9 StateTtlConfig.TimeCharacteristic

2020-01-15 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-15605:
---

 Summary: Remove deprecated in 1.9 StateTtlConfig.TimeCharacteristic
 Key: FLINK-15605
 URL: https://issues.apache.org/jira/browse/FLINK-15605
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15604) Error for 504 returns

2020-01-15 Thread LCID Fire (Jira)
LCID Fire created FLINK-15604:
-

 Summary: Error for 504 returns
 Key: FLINK-15604
 URL: https://issues.apache.org/jira/browse/FLINK-15604
 Project: Flink
  Issue Type: Bug
Reporter: LCID Fire


I get the errors:
{noformat}
Jan 15, 2020 4:02:24 PM org.apache.flink.runtime.rest.RestClient$ClientHandler 
readRawResponseJan 15, 2020 4:02:24 PM 
org.apache.flink.runtime.rest.RestClient$ClientHandler readRawResponseSEVERE: 
Response was not valid 
JSON.org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
 Unexpected character ('<' (code 60)): expected a valid value (number, String, 
array, object, 'true', 'false' or 'null') at [Source: 
(org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream); line: 1, 
column: 2] at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2630)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2497)
 at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:513)
 at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:461)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:384)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:282)
 at 

Let Flink SQL PlannerExpressionParserImpl#FieldRefrence use Unicode as its default charset

2020-01-15 Thread ??????
Hi all, 
the related issue:https://issues.apache.org/jira/browse/FLINK-15573


 As the title tells, what I do want to do is let the `FieldRefrence` use 
Unicode as its default charset (or maybe as an optional charset which can 
be configured).
According to the `PlannerExpressionParserImpl`, currently FLINK uses 
JavaIdentifier as `FieldRefrence`??s default charset. But, from my 
perspective, it is not enough. Considering that user who uses ElasticSearch as 
sink??we all know that ES has A field called `@timestamp`, which JavaIdentifier 
cannot meet.


 So in my team, we just let `PlannerExpressionParserImpl#FieldRefrence` 
use Unicode as its default charset so that solves this kind of problem. (Plz 
refer to the issue I mentioned above )


In my Opinion, the change shall be for general purpose:
Firstly, Mysql supports unicode as default field charset, see the field 
named `@@`, so shall we support unicode also? 



 What?? s more, my team really get a lot of benefits from 
this change. I also believe that it can give other users more benefits without 
even any harm!
 Fortunately, the change supports fully forwards compatibility.Cuz 
Unicode is the superset of JavaIdentifier. Only a few code change can 
achieve this goal.
 Looking forward for any opinion.
 
btw, thanks to tison~





Best Regards
??Shoi Liu






Let Flink SQL PlannerExpressionParserImpl#FieldRefrence use Unicode as its default charset

2020-01-15 Thread ??????
Hi all, 
the related issue:https://issues.apache.org/jira/browse/FLINK-15573


 As the title tells, what I do want to do is let the `FieldRefrence` use 
Unicode as its default charset (or maybe as an optional charset which can 
be configured).
According to the `PlannerExpressionParserImpl`, currently FLINK uses 
JavaIdentifier as `FieldRefrence`??s default charset. But, from my 
perspective, it is not enough. Considering that user who uses ElasticSearch as 
sink??we all know that ES has A field called `@timestamp`, which JavaIdentifier 
cannot meet.


 So in my team, we just let `PlannerExpressionParserImpl#FieldRefrence` 
use Unicode as its default charset so that solves this kind of problem. (Plz 
refer to the issue I mentioned above )


In my Opinion, the change shall be for general purpose:
Firstly, Mysql supports unicode as default field charset, see the field 
named `@@`, so shall we support unicode also? 



 What?? s more, my team really get a lot of benefits from 
this change. I also believe that it can give other users more benefits without 
even any harm!
 Fortunately, the change supports fully forwards compatibility.Cuz 
Unicode is the superset of JavaIdentifier. Only a few code change can 
achieve this goal.
 Looking forward for any opinion.
 
btw, thanks to tison~





Best Regards
??Shoi Liu






[jira] [Created] (FLINK-15603) Show "barrier lag" in checkpoint statistics

2020-01-15 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-15603:


 Summary: Show "barrier lag" in checkpoint statistics
 Key: FLINK-15603
 URL: https://issues.apache.org/jira/browse/FLINK-15603
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Stephan Ewen
 Fix For: 1.11.0


One of the most important metrics is missing in the checkpoint stats: "barrier 
lag", meaning the time it between when the checkpoint was triggered and when 
the barriers arrive at a task.

That time is critical to identify if a checkpoint takes too long because of 
backpressure or other contention.

You can implicitly calculate this by "end_to_end_time - sync_time - 
async_time", but it is much more obvious for users that something is up when 
this number is explicitly shown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15602) Blink planner does not respect the precision when casting timestamp to varchar

2020-01-15 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-15602:


 Summary: Blink planner does not respect the precision when casting 
timestamp to varchar
 Key: FLINK-15602
 URL: https://issues.apache.org/jira/browse/FLINK-15602
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Dawid Wysakowicz
 Fix For: 1.10.0


According to SQL 2011 Part 2 Section 6.13 General Rules 11) d)

{quote}
If SD is a datetime data type or an interval data type then let Y be the 
shortest character string that
conforms to the definition of  in Subclause 5.3, “”, and such 
that the interpreted value
of Y is SV and the interpreted precision of Y is the precision of SD.
{quote}

That means:
{code}
select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
as TIMESTAMP(0)) as VARCHAR(256)) from ...;
// should produce
// 2014-07-02 06:14:00

select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
as TIMESTAMP(3)) as VARCHAR(256)) from ...;
// should produce
// 2014-07-02 06:14:00.000

select cast(cast(TO_TIMESTAMP('2014-07-02 06:14:00', '-MM-DD HH24:mm:SS') 
as TIMESTAMP(9)) as VARCHAR(256)) from ...;
// should produce
// 2014-07-02 06:14:00.0
{code}

One possible solution would be to propagate the precision in 
{{org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#localTimeToStringCode}}.
 If I am not mistaken this problem was introduced in [FLINK-14599]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15601) Remove useless constant field NUM_STOP_CALL_TRIES in Execution

2020-01-15 Thread vinoyang (Jira)
vinoyang created FLINK-15601:


 Summary: Remove useless constant field NUM_STOP_CALL_TRIES in 
Execution
 Key: FLINK-15601
 URL: https://issues.apache.org/jira/browse/FLINK-15601
 Project: Flink
  Issue Type: Wish
  Components: Runtime / Task
Reporter: vinoyang


Currently, the constant field {{NUM_STOP_CALL_TRIES}} in {{Execution}} is not 
been used. IMO, we can remove it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15600) Further relax the UDF constraints for Java classes

2020-01-15 Thread Timo Walther (Jira)
Timo Walther created FLINK-15600:


 Summary: Further relax the UDF constraints for Java classes
 Key: FLINK-15600
 URL: https://issues.apache.org/jira/browse/FLINK-15600
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-12283 already relaxed the UDF constraints for classes which is a big 
usability improvement for Scala users. However, Java users are still facing 
issues when using anonymous inner classes.

We should allow the following:
{code}
tEnv.registerFunction("testi", new ScalarFunction() {
public String eval(Integer i) {
return String.valueOf(i);
}
});
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-15 Thread HuWeihua
Hi, Andrey

Thanks for your response.

I have checked this Jira ticket and I think it can work in standalone mode 
which TaskManager has been started before scheduling tasks.
But we are currently running flink on yarn in per-job cluster mode.

I noticed that this issue has already been raised. I will keep watching this 
ticket. 

Thanks again.

Best
Weihua Hu

> 2020年1月15日 17:53,Andrey Zagrebin  写道:
> 
> HI HuWeihua,
> 
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in 
> progress).
> You can check the related Jira ticket [1].
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-12122 
> 
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua  > wrote:
> Hi, All
> We encountered some problems during the upgrade from Flink 1.5 to Flink 1.9. 
> Flink's scheduling strategy has changed. Flink 1.9 prefers centralized 
> scheduling, while Flink 1.5 prefers decentralized scheduling. This change has 
> caused resources imbalance and blocked our upgrade plan. We have thousands of 
> jobs that need to be upgraded.
> 
> For example,
> There is a job with 10 sources and 100 sinks. Each source need 1 core and 
> each sink need 0.1 core.
> Try to run this job on Yarn, configure the numberOfTaskSlots is 10, 
> yarn.containers.vcores is 2.
> 
> When using Flink-1.5:
> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores totally. 
> So the job with this configuration works very well. The schedule results is 
> shown in Figure 1.
> When using Flink-1.9:
> The 10 sources will be scheduled to one TaskManager  and the 100 sinks will 
> scheduled to other 10 TaskManagers.  The schedule results is shown in Figure 
> 2.
> In this scenario, the TaskManager which run sources need 10 cores, other 
> TaskManagers need 1 cores. But TaskManager must be configured the same, So we 
> need 11 TaskManager with 10 cores. 
> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
> 
> In addition to the waste of resources, we also encountered other problems 
> caused by centralized scheduling strategy.
> Network bandwidth. Tasks of the same type are scheduled to the one 
> TaskManager, causing too much network traffic on the machine.
> Some jobs need to sink to the local agent. After centralized scheduling, the 
> insufficient processing capacity of the single machine causes a backlog of 
> consumption.
> 
> In summary, we think a decentralized scheduling strategy is necessary.
> 
> 
> Figure 1. Flink 1.5 schedule results
> <粘贴的图形-3.tiff>
> 
> Figure 2. Flink 1.9 schedule results
> <粘贴的图形-4.tiff>
> 
> 
> 
> Best
> Weihua Hu
> 



[jira] [Created] (FLINK-15599) SQL client requires both legacy and blink planner to be on the classpath

2020-01-15 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-15599:


 Summary: SQL client requires both legacy and blink planner to be 
on the classpath
 Key: FLINK-15599
 URL: https://issues.apache.org/jira/browse/FLINK-15599
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: Dawid Wysakowicz
 Fix For: 1.10.0


Sql client uses directly some of the internal classes of the legacy planner, 
thus it does not work with only the blink planner on the classpath.

The internal class that's being used is 
{{org.apache.flink.table.functions.FunctionService}}

This dependency was introduced in FLINK-13195



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15598) Memory accuracy loss in YarnClusterDescriptor may lead to deployment failure.

2020-01-15 Thread Xintong Song (Jira)
Xintong Song created FLINK-15598:


 Summary: Memory accuracy loss in YarnClusterDescriptor may lead to 
deployment failure.
 Key: FLINK-15598
 URL: https://issues.apache.org/jira/browse/FLINK-15598
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Reporter: Xintong Song
 Fix For: 1.10.0


Currently, YarnClusterDescriptor parses/derives TM process memory size from 
configuration, store it in ClusterSpecification and validate 
ClusterSpecification, then overwrite the memory size back to configuration.

This logic is unnecessary. The memory validation is already covered by creating 
TaskExecutorResourceSpec from configuration in TaskExecutorResourceUtils.

Moreover, the memory size is stored in MB in ClusterSpecification. The accuracy 
loss may lead to memory validation failure, which prevent the cluster from 
being deployed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Please give me the permission as a contributor

2020-01-15 Thread Zhu Zhu
Welcome to the Flink community!
You no longer need contributor permissions to open JIRA tickets. You can
simply open a JIRA ticket and ask a committer to assign you to it and start
working on it.
Please check the Flink's contribution guidelines [1] for more details.

[1] https://flink.apache.org/contributing/how-to-contribute.html

Thanks,
Zhu Zhu

m...@lishiyu.cn  于2020年1月15日周三 下午5:12写道:

> Hi Guys,
>
> I want to contribute to Apache Flink.
> Would you please give me the permission as a contributor?
> My JIRA ID is lishiyu.
>
>
> m...@lishiyu.cn
>


Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-15 Thread Chesnay Schepler
This is a known issue that's will be fixed in 1.9.2/1.10.0; see 
https://issues.apache.org/jira/browse/FLINK-12122 .


On 15/01/2020 10:07, HuWeihua wrote:

Hi, All
We encountered some problems during the upgrade from Flink 1.5 to 
Flink 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers 
centralized scheduling, while Flink 1.5 prefers decentralized 
scheduling. This change has caused resources imbalance and blocked our 
upgrade plan. We have thousands of jobs that need to be upgraded.


For example,
There is a job with 10 sources and 100 sinks. Each source need 1 core 
and each sink need 0.1 core.
Try to run this job on Yarn, configure the numberOfTaskSlots is 10, 
yarn.containers.vcores is 2.


When using Flink-1.5:
Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores 
totally. So the job with this configuration works very well. 
The schedule results is shown in Figure 1.

When using Flink-1.9:
The 10 sources will be scheduled to one TaskManager  and the 100 sinks 
will scheduled to other 10 TaskManagers. The schedule results is shown 
in Figure 2.
In this scenario, the TaskManager which run sources need 10 cores, 
other TaskManagers need 1 cores. But TaskManager must be configured 
the same, So we need 11 TaskManager with 10 cores.

This situation waste (10-2)*11 = 88cores more than Flink 1.5.

In addition to the waste of resources, we also encountered other 
problems caused by centralized scheduling strategy.


 1. Network bandwidth. Tasks of the same type are scheduled to the one
TaskManager, causing too much network traffic on the machine.

 2. Some jobs need to sink to the local agent. After centralized
scheduling, the insufficient processing capacity of the single
machine causes a backlog of consumption.


In summary, we think a decentralized scheduling strategy is necessary.


Figure 1. Flink 1.5 schedule results

Figure 2. Flink 1.9 schedule results



Best
Weihua Hu





Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-15 Thread Xintong Song
Thank you all for the well discussion.

If there's no further concerns or objections, I would like to conclude this
thread into the following action items.

   - Change default value of "taskmanager.memory.jvm-overhead.min" to 192MB.
   - Change default value of "taskmanager.memory.jvm-metaspace.size" to
   96MB.
   - Change the value of "taskmanager.memory.process.size" in the default
   "flink-conf.yaml" to 1568MB.
   - Relax JVM overhead sanity check, so that the fraction does not need to
   be strictly followed, as long as the min/max range is respected.


Thank you~

Xintong Song



On Wed, Jan 15, 2020 at 5:50 PM Xintong Song  wrote:

> There's more idea from offline discussion with Andrey.
>
> If we decide to make metaspace 96MB, we can also make process.size 1568MB
> (1.5G + 32MB).
> According to the spreadsheet
> ,
> 1.5GB process size and 64MB metaspace result in memory sizes with the
> values to be powers of 2.
> When increasing the metaspace from 64MB to 96MB, it would be good to
> preserve that alignment, for better readability that later we explain the
> memory configuration and calculations in documents.
> I believe it's not a big difference between 1.5GB and 1.5GB + 32 MB for
> memory consumption.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 15, 2020 at 11:55 AM Xintong Song 
> wrote:
>
>> Thanks for the discussion, Stephan, Till and Andrey.
>>
>> +1 for the managed fraction (0.4) and process.size (1.5G).
>>
>> *JVM overhead min 196 -> 192Mb (128 + 64)*
>>> small correction for better power 2 alignment of sizes
>>>
>> Sorry, this was a typo (and the same for the jira comment which is
>> copy-pasted). It was 192mb used in the tuning report.
>>
>> *meta space at least 96Mb?*
>>> There is still a concern about JVM metaspace being just 64Mb.
>>> We should confirm that it is not a problem by trying to test it also with
>>> the SQL jobs, Blink planner.
>>> Also, by running tpc-ds e2e Flink tests with this setting. Basically,
>>> where
>>> more classes are generated/loaded.
>>> We can look into this tomorrow.
>>>
>> I have already tried the setting metaspace to 64Mb with the e2e tests,
>> where I believe various sql / blink / tpc-ds test cases are included. (See
>> https://travis-ci.com/flink-ci/flink/builds/142970113 )
>> However, I'm also ok with 96Mb, since we are increasing the process.size
>> to 1.5G.
>> My original concern for having larger metaspace size was that we may
>> result in too small flink.size for out-of-box configuration on
>> containerized setups.
>>
>> *sanity check of JVM overhead*
>>> When the explicitly configured process and flink memory sizes are
>>> verified
>>> with the JVM meta space and overhead,
>>> JVM overhead does not have to be the exact fraction.
>>> It can be just within its min/max range, similar to how it is now for
>>> network/shuffle memory check after FLINK-15300.
>>>
>> Also +1 for this.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jan 15, 2020 at 6:16 AM Andrey Zagrebin 
>> wrote:
>>
>>> Hi all,
>>>
>>> Stephan, Till and me had another offline discussion today. Here is the
>>> outcome of our brainstorm.
>>>
>>> *managed fraction 0.4*
>>> just confirmed what we already discussed here.
>>>
>>> *process.size = 1536Mb (1,5Gb)*
>>> We agreed to have process.size in the default settings with the
>>> explanation
>>> of flink.size alternative in the comment.
>>> The suggestion is to increase it from 1024 to 1536mb. As you can see in
>>> the
>>> earlier provided calculation spreadsheet,
>>> it will result in bigger JVM Heap and managed memory (both ~0.5Gb) for
>>> all
>>> new setups.
>>> This should provide good enough experience for trying out Flink.
>>>
>>> *JVM overhead min 196 -> 192Mb (128 + 64)*
>>> small correction for better power 2 alignment of sizes
>>>
>>> *meta space at least 96Mb?*
>>> There is still a concern about JVM metaspace being just 64Mb.
>>> We should confirm that it is not a problem by trying to test it also with
>>> the SQL jobs, Blink planner.
>>> Also, by running tpc-ds e2e Flink tests with this setting. Basically,
>>> where
>>> more classes are generated/loaded.
>>> We can look into this tomorrow.
>>>
>>> *sanity check of JVM overhead*
>>> When the explicitly configured process and flink memory sizes are
>>> verified
>>> with the JVM meta space and overhead,
>>> JVM overhead does not have to be the exact fraction.
>>> It can be just within its min/max range, similar to how it is now for
>>> network/shuffle memory check after FLINK-15300.
>>>
>>> Best,Andrey
>>>
>>> On Tue, Jan 14, 2020 at 4:30 PM Stephan Ewen  wrote:
>>>
>>> > I like the idea of having a larger default "flink.size" in the
>>> config.yaml.
>>> > Maybe we don't need to double it, but something like 1280m would be
>>> okay?
>>> >
>>> > On Tue, Jan 14, 2020 at 3:47 PM Andrey Zagrebin 
>>> > wrote:
>>> >
>>> > > Hi all!
>>> > >
>>> > > Great that we have already 

Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-15 Thread Andrey Zagrebin
HI HuWeihua,

I think your issue should resolve with 1.9.2 and 1.10 (not released but in
progress).
You can check the related Jira ticket [1].

Best,
Andrey

[1] https://jira.apache.org/jira/browse/FLINK-12122

On Wed, Jan 15, 2020 at 10:08 AM HuWeihua  wrote:

> Hi, All
> We encountered some problems during the upgrade from Flink 1.5 to Flink
> 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized
> scheduling, while Flink 1.5 prefers decentralized scheduling. This change
> has caused resources imbalance and blocked our upgrade plan. We have
> thousands of jobs that need to be upgraded.
>
> For example,
> There is a job with 10 sources and 100 sinks. Each source need 1 core and
> each sink need 0.1 core.
> Try to run this job on Yarn, configure the numberOfTaskSlots is 10,
> yarn.containers.vcores is 2.
>
> When using Flink-1.5:
> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores
> totally. So the job with this configuration works very well. The schedule
> results is shown in Figure 1.
> When using Flink-1.9:
> The 10 sources will be scheduled to one TaskManager  and the 100 sinks
> will scheduled to other 10 TaskManagers.  The schedule results is shown
> in Figure 2.
> In this scenario, the TaskManager which run sources need 10 cores, other
> TaskManagers need 1 cores. But TaskManager must be configured the same, So
> we need 11 TaskManager with 10 cores.
> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
>
> In addition to the waste of resources, we also encountered other problems
> caused by centralized scheduling strategy.
>
>1. Network bandwidth. Tasks of the same type are scheduled to the one
>TaskManager, causing too much network traffic on the machine.
>
>
>1. Some jobs need to sink to the local agent. After centralized
>scheduling, the insufficient processing capacity of the single machine
>causes a backlog of consumption.
>
>
> In summary, we think a decentralized scheduling strategy is necessary.
>
>
> Figure 1. Flink 1.5 schedule results
>
> Figure 2. Flink 1.9 schedule results
>
>
>
> Best
> Weihua Hu
>
>


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-15 Thread Xintong Song
There's more idea from offline discussion with Andrey.

If we decide to make metaspace 96MB, we can also make process.size 1568MB
(1.5G + 32MB).
According to the spreadsheet
,
1.5GB process size and 64MB metaspace result in memory sizes with the
values to be powers of 2.
When increasing the metaspace from 64MB to 96MB, it would be good to
preserve that alignment, for better readability that later we explain the
memory configuration and calculations in documents.
I believe it's not a big difference between 1.5GB and 1.5GB + 32 MB for
memory consumption.

Thank you~

Xintong Song



On Wed, Jan 15, 2020 at 11:55 AM Xintong Song  wrote:

> Thanks for the discussion, Stephan, Till and Andrey.
>
> +1 for the managed fraction (0.4) and process.size (1.5G).
>
> *JVM overhead min 196 -> 192Mb (128 + 64)*
>> small correction for better power 2 alignment of sizes
>>
> Sorry, this was a typo (and the same for the jira comment which is
> copy-pasted). It was 192mb used in the tuning report.
>
> *meta space at least 96Mb?*
>> There is still a concern about JVM metaspace being just 64Mb.
>> We should confirm that it is not a problem by trying to test it also with
>> the SQL jobs, Blink planner.
>> Also, by running tpc-ds e2e Flink tests with this setting. Basically,
>> where
>> more classes are generated/loaded.
>> We can look into this tomorrow.
>>
> I have already tried the setting metaspace to 64Mb with the e2e tests,
> where I believe various sql / blink / tpc-ds test cases are included. (See
> https://travis-ci.com/flink-ci/flink/builds/142970113 )
> However, I'm also ok with 96Mb, since we are increasing the process.size
> to 1.5G.
> My original concern for having larger metaspace size was that we may
> result in too small flink.size for out-of-box configuration on
> containerized setups.
>
> *sanity check of JVM overhead*
>> When the explicitly configured process and flink memory sizes are verified
>> with the JVM meta space and overhead,
>> JVM overhead does not have to be the exact fraction.
>> It can be just within its min/max range, similar to how it is now for
>> network/shuffle memory check after FLINK-15300.
>>
> Also +1 for this.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 15, 2020 at 6:16 AM Andrey Zagrebin 
> wrote:
>
>> Hi all,
>>
>> Stephan, Till and me had another offline discussion today. Here is the
>> outcome of our brainstorm.
>>
>> *managed fraction 0.4*
>> just confirmed what we already discussed here.
>>
>> *process.size = 1536Mb (1,5Gb)*
>> We agreed to have process.size in the default settings with the
>> explanation
>> of flink.size alternative in the comment.
>> The suggestion is to increase it from 1024 to 1536mb. As you can see in
>> the
>> earlier provided calculation spreadsheet,
>> it will result in bigger JVM Heap and managed memory (both ~0.5Gb) for all
>> new setups.
>> This should provide good enough experience for trying out Flink.
>>
>> *JVM overhead min 196 -> 192Mb (128 + 64)*
>> small correction for better power 2 alignment of sizes
>>
>> *meta space at least 96Mb?*
>> There is still a concern about JVM metaspace being just 64Mb.
>> We should confirm that it is not a problem by trying to test it also with
>> the SQL jobs, Blink planner.
>> Also, by running tpc-ds e2e Flink tests with this setting. Basically,
>> where
>> more classes are generated/loaded.
>> We can look into this tomorrow.
>>
>> *sanity check of JVM overhead*
>> When the explicitly configured process and flink memory sizes are verified
>> with the JVM meta space and overhead,
>> JVM overhead does not have to be the exact fraction.
>> It can be just within its min/max range, similar to how it is now for
>> network/shuffle memory check after FLINK-15300.
>>
>> Best,Andrey
>>
>> On Tue, Jan 14, 2020 at 4:30 PM Stephan Ewen  wrote:
>>
>> > I like the idea of having a larger default "flink.size" in the
>> config.yaml.
>> > Maybe we don't need to double it, but something like 1280m would be
>> okay?
>> >
>> > On Tue, Jan 14, 2020 at 3:47 PM Andrey Zagrebin 
>> > wrote:
>> >
>> > > Hi all!
>> > >
>> > > Great that we have already tried out new FLIP-49 with the bigger jobs.
>> > >
>> > > I am also +1 for the JVM metaspace and overhead changes.
>> > >
>> > > Regarding 0.3 vs 0.4 for managed memory, +1 for having more managed
>> > memory
>> > > for Rocksdb limiting case.
>> > >
>> > > In general, this looks mostly to be about memory distribution between
>> JVM
>> > > heap and managed off-heap.
>> > > Comparing to the previous default setup, the JVM heap dropped
>> (especially
>> > > for standalone) mostly due to moving managed from heap to off-heap and
>> > then
>> > > also adding framework off-heap.
>> > > In general, this can be the most important consequence for beginners
>> and
>> > > those who rely on the default configuration.
>> > > Especially the legacy default configuration in standalone with falling
>> > 

[jira] [Created] (FLINK-15597) Relax sanity check of JVM memory overhead to be within its min/max

2020-01-15 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-15597:
---

 Summary: Relax sanity check of JVM memory overhead to be within 
its min/max
 Key: FLINK-15597
 URL: https://issues.apache.org/jira/browse/FLINK-15597
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration, Runtime / Task
Reporter: Andrey Zagrebin
Assignee: Xintong Song
 Fix For: 1.10.0


When the explicitly configured process and Flink memory sizes are verified with 
the JVM meta space and overhead, JVM overhead does not have to be the exact 
fraction.
It can be just within its min/max range, similar to how it is now for 
network/shuffle memory check after FLINK-15300.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Please give me the permission as a contributor

2020-01-15 Thread m...@lishiyu.cn
Hi Guys,

I want to contribute to Apache Flink.
Would you please give me the permission as a contributor?
My JIRA ID is lishiyu.


m...@lishiyu.cn