[jira] [Created] (FLINK-21920) Optimize DefaultScheduler#allocateSlots

2021-03-22 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21920:


 Summary: Optimize DefaultScheduler#allocateSlots
 Key: FLINK-21920
 URL: https://issues.apache.org/jira/browse/FLINK-21920
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


Based on the scheduler benchmark introduced in FLINK-21731, we find that there 
are several procedures related to {{DefaultScheduler#allocateSlots}} have 
O(N^2) complexity. 

 

The first one is: 
{{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}.
 The original implementation is:

 
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
get the result partition's producer vertex and determine the 
ExecutionSlotSharingGroup where the producer vertex locates is available for 
current vertex{code}
This procedure has O(N^2) complexity.

It's obvious that the result partitions in the same ConsumedPartitionGroup have 
the same producer vertex. So we can just iterate over the 
ConsumedPartitionGroups instead of all the consumed partitions. This will 
decrease the complexity from O(N^2) to O(N).

 

The second one is: 
{{ExecutionGraphToInputsLocationsRetrieverAdapter#getConsumedResultPartitionsProducers}}.
 The original implementation is:
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all ConsumedPartitionGroup of the SchedulingExecutionVertex:
for all IntermediateResultPartition in the ConsumedPartitionGroup:
  get producer of the IntermediateResultPartition {code}
This procedure has O(N^2) complexity.

We can see that for each SchedulingExecutionVertex, the producers of its 
ConsumedPartitionGroup is calculated separately. For 
SchedulingExecutionVertices in the same ConsumerVertexGroup, they have the same 
ConsumedPartitionGroup. Thus, we don't need to calculate the producers over and 
over again. We can use a local cache to cache the producers. This will decrease 
the complexity from O(N^2) to O(N).

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21919) JsonPlanTestBase should extends AbstractTestBase

2021-03-22 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-21919:


 Summary: JsonPlanTestBase should extends AbstractTestBase
 Key: FLINK-21919
 URL: https://issues.apache.org/jira/browse/FLINK-21919
 Project: Flink
  Issue Type: Improvement
Reporter: Jingsong Lee
 Fix For: 1.13.0


If we don't inheritAbstractTestBase, it may be unstable in different 
environments. Our it cases should inherit it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21918) Add execution.runtime-mode setter in StreamExecutionEnvironment

2021-03-22 Thread Dian Fu (Jira)
Dian Fu created FLINK-21918:
---

 Summary: Add execution.runtime-mode setter in 
StreamExecutionEnvironment
 Key: FLINK-21918
 URL: https://issues.apache.org/jira/browse/FLINK-21918
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21917) Add back missing zh docs

2021-03-22 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-21917:
--

 Summary: Add back missing zh docs
 Key: FLINK-21917
 URL: https://issues.apache.org/jira/browse/FLINK-21917
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.13.0
Reporter: Leonard Xu
 Fix For: 1.13.0


We lost some Chinese doc pages are lost when migrate from legacy doc website to 
current one in (https://github.com/apache/flink/pull/14903),  such as `jdbc 
zh`, `kafka zh`.

we should add them back.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Split PyFlink packages into two packages: apache-flink and apache-flink-libraries

2021-03-22 Thread Xingbo Huang
Thanks for the feedback everyone. I will proceed if there is no objection.

Best,
Xingbo

Till Rohrmann  于2021年3月22日周一 下午5:30写道:

> If there is no other way, then I would say let's go with splitting the
> modules. This is already better than keeping the Flink binaries bundled
> with every Python/platform package.
>
> Cheers,
> Till
>
> On Mon, Mar 22, 2021 at 8:28 AM Xingbo Huang  wrote:
>
> > When we **pip install** a wheel package, it just unpacks the wheel
> package
> > and installs its dependencies[1]. There is no way to download things from
> > an external website during installation. It works differently from the
> > source package where we could download something in the setup.py. This is
> > explained in detail in [2]. So I'm afraid that splitting the package is
> the
> > only solution we have if we want to reduce the package size of pyflink.
> >
> > [1] https://www.python.org/dev/peps/pep-0427/
> > [2] https://realpython.com/python-wheels/#advantages-of-python-wheels
> >
> > Best,
> > Xingbo
> >
> > Till Rohrmann  于2021年3月19日周五 下午6:32写道:
> >
> > > I think that we should try to reduce the size of the packages by either
> > > splitting them or by having another means to retrieve the Java
> binaries.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Mar 19, 2021 at 2:58 AM Xingbo Huang 
> wrote:
> > >
> > > > Hi Till,
> > > >
> > > > The package size of tensorflow[1] is also very big(about 300MB+).
> > > However,
> > > > it does not try to solve the problem, but expands the space limit in
> > PyPI
> > > > frequently whenever the project space is full. We could also choose
> > this
> > > > option. According to our current release frequency, we probably need
> to
> > > > apply for 15GB expansion every year. There are not too many similar
> > > cases,
> > > > so there is also no standard solution to refer to. But the behavior
> of
> > > > splitting a project into multiple packages is quite common. For
> > example,
> > > > apache airflow will prepare a corresponding release package for each
> > > > provider[2].
> > > >
> > > > So I think there are currently two solutions in my mind which could
> > work.
> > > >
> > > > 1. Just keep the current solution and expand the space limit in PyPI
> > > > whenever the space is full.
> > > >
> > > > 2. Split into two packages to reduce the wheel package size.
> > > >
> > > > [1] https://pypi.org/project/tensorflow/#files
> > > > [2] https://pypi.org/search/?q=apache-airflow-*=
> > > >
> > > > Best,
> > > > Xingbo
> > > >
> > > > Till Rohrmann  于2021年3月17日周三 下午9:22写道:
> > > >
> > > > > How do other projects solve this problem?
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Wed, Mar 17, 2021 at 3:45 AM Xingbo Huang 
> > > wrote:
> > > > >
> > > > > > Hi Chesnay,
> > > > > >
> > > > > > Yes, in most cases, we can indeed download the required jars in
> > > > > `setup.py`,
> > > > > > which is also the solution I originally thought of reducing the
> > size
> > > of
> > > > > > wheel packages. However, I'm afraid that it will not work in
> > > scenarios
> > > > > when
> > > > > > accessing the external network is not possible which is very
> common
> > > in
> > > > > the
> > > > > > production cluster.
> > > > > >
> > > > > > Best,
> > > > > > Xingbo
> > > > > >
> > > > > > Chesnay Schepler  于2021年3月16日周二 下午8:32写道:
> > > > > >
> > > > > > > This proposed apache-flink-libraries package would just contain
> > the
> > > > > > > binary, right? And effectively be unusable to the python
> audience
> > > on
> > > > > > > it's own.
> > > > > > >
> > > > > > > Essentially we are just abusing Pypi for shipping a java
> binary.
> > Is
> > > > > > > there no way for us to download the jars when the python
> package
> > is
> > > > > > > being installed? (e.g., in setup.py)
> > > > > > >
> > > > > > > On 3/16/2021 1:23 PM, Dian Fu wrote:
> > > > > > > > Yes, the size of .whl file in PyFlink will also be about 3MB
> if
> > > we
> > > > > > split
> > > > > > > the package. Currently the package is big because we bundled
> the
> > > jar
> > > > > > files
> > > > > > > in it.
> > > > > > > >
> > > > > > > >> 2021年3月16日 下午8:13,Chesnay Schepler  写道:
> > > > > > > >>
> > > > > > > >> key difference being that the beam .whl files are 3mb large,
> > aka
> > > > 60x
> > > > > > > smaller.
> > > > > > > >>
> > > > > > > >> On 3/16/2021 1:06 PM, Dian Fu wrote:
> > > > > > > >>> Hi Chesnay,
> > > > > > > >>>
> > > > > > > >>> We will publish binary packages separately for:
> > > > > > > >>> 1) Python 3.5 / 3.6 / 3.7 / 3.8 (since 1.12) separately
> > > > > > > >>> 2) Linux / Mac separately
> > > > > > > >>>
> > > > > > > >>> Besides, there is also a source package which is used when
> > none
> > > > of
> > > > > > the
> > > > > > > above binary packages is usable, e.g. for Window users.
> > > > > > > >>>
> > > > > > > >>> PS: publishing multiple binary packages is very common in
> > > Python
> > > > > > > world, e.g. Beam published 22 packages in 2.28, Pandas
> published
> > 16

[jira] [Created] (FLINK-21916) Allows multiple kinds of ManagedMemoryUseCase for the same operator

2021-03-22 Thread Dian Fu (Jira)
Dian Fu created FLINK-21916:
---

 Summary: Allows multiple kinds of ManagedMemoryUseCase for the 
same operator
 Key: FLINK-21916
 URL: https://issues.apache.org/jira/browse/FLINK-21916
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.13.0


Currently, the implementation of 
[SimpleTransformationTranslator|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java#L119]
 restricts that OPERATOR and PYTHON use case could not be used in the same 
operator. For batch mode of Python DataStream API:
- OPERATOR use case will be added firstly in 
BatchExecutionUtils.applyBatchExecutionSettings for a keyed operator during 
translation
- Then the PYTHON use case which is set in the transformation could not be 
added in 
[SimpleTransformationTranslator|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java#L119]
 as the streamNode.getManagedMemoryOperatorScopeUseCaseWeights() is not empty.

We need to remove the restriction in 
[SimpleTransformationTranslator|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java#L119]
  if there are no special reasons.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21915) Optimize Execution#finishPartitionsAndUpdateConsumers

2021-03-22 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21915:


 Summary: Optimize Execution#finishPartitionsAndUpdateConsumers
 Key: FLINK-21915
 URL: https://issues.apache.org/jira/browse/FLINK-21915
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


Based on the scheduler benchmark {{PartitionReleaseInBatchJobBenchmark}} 
introduced in FLINK-20612, we find that there's another procedure that has 
O(N^2) computation complexity: 
{{Execution#finishPartitionsAndUpdateConsumers}}. 

Once an execution is finished, it will finish all its BLOCKING partitions and 
update the partition info to all consumer vertices. The procedure can be 
illustrated as the following pseudo code:
{code:java}
for all Execution in ExecutionGraph:
  for all produced IntermediateResultPartition of the Execution:
for all consumer ExecutionVertex of the IntermediateResultPartition:
  update or cache partition info{code}
This procedure has O(N^2) complexity in total.

Based on FLINK-21326, the consumed partitions are grouped if they are connected 
to the same consumer vertices. Therefore, we can update partition info of the 
entire ConsumedPartitionGroup in batch, rather than one by one. This will 
decrease the complexity from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Move Flink ML pipeline API and library code to a separate repository named flink-ml

2021-03-22 Thread Dian Fu
+1

Regards,
Dian

On Fri, Mar 19, 2021 at 6:39 PM Till Rohrmann  wrote:

> +1
>
> Cheers,
> Till
>
> On Fri, Mar 19, 2021 at 9:02 AM Becket Qin  wrote:
>
> > +1. Thanks for driving the effort, Dong.
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Mar 19, 2021 at 10:07 AM Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > I would like to start the vote for moving Flink ML pipeline API and
> > library
> > > code to a separate repository named flink-ml.
> > >
> > > The main motivation is to allow us to remove SQL planner in Flink 1.14
> > > while still allowing ML pipeline API/library development in the coming
> > > year,
> > >
> > > The vote will last for at least 72 hours, following the consensus
> voting
> > > process.
> > >
> > > Discussion thread:
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Flink-ML-pipeline-API-and-library-code-to-a-separate-repository-named-flink-ml-tc49420.html
> > >
> > >
> > > Thanks!
> > > Dong
> > >
> >
>


[jira] [Created] (FLINK-21914) Trying to access closed classloader

2021-03-22 Thread Spongebob (Jira)
Spongebob created FLINK-21914:
-

 Summary: Trying to access closed classloader
 Key: FLINK-21914
 URL: https://issues.apache.org/jira/browse/FLINK-21914
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.12.2
 Environment: flink: 1.12.2

hadoop: 3.1.3

hive: 3.1.2

 
Reporter: Spongebob
 Attachments: app.log

I am trying to deploy flink application on yarn, but got this exception: 

Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
access closed classloader. Please check if you store classloaders directly or 
indirectly in static fields. If the stacktrace suggests that the leak occurs in 
a third party library and cannot be fixed immediately, you can disable this 
check with the configuration 'classloader.check-leaked-classloader'.

 

This application tested pass on my local environment. And the application 
detail is read and write into hive via flink table environment. you can view 
attachment for yarn log which  source and sink data info was deleted



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21913) Update DynamicTableFactory.Context to use ResolvedCatalogTable

2021-03-22 Thread Timo Walther (Jira)
Timo Walther created FLINK-21913:


 Summary: Update DynamicTableFactory.Context to use 
ResolvedCatalogTable
 Key: FLINK-21913
 URL: https://issues.apache.org/jira/browse/FLINK-21913
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Update DynamicTableFactory.Context to use ResolvedCatalogTable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21912) Introduce Schema and ResolvedSchema in Python API

2021-03-22 Thread Timo Walther (Jira)
Timo Walther created FLINK-21912:


 Summary: Introduce Schema and ResolvedSchema in Python API
 Key: FLINK-21912
 URL: https://issues.apache.org/jira/browse/FLINK-21912
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Timo Walther


The changes of FLIP-164 should also be applied to the Python API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21911) Support arithmetic MIN/MAX in SQL

2021-03-22 Thread Timo Walther (Jira)
Timo Walther created FLINK-21911:


 Summary: Support arithmetic MIN/MAX in SQL
 Key: FLINK-21911
 URL: https://issues.apache.org/jira/browse/FLINK-21911
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


We should discuss if we want to support math MIN / MAX in Flink SQL. It seems 
also other vendors do not support it out of the box:

[https://stackoverflow.com/questions/124417/is-there-a-max-function-in-sql-server-that-takes-two-values-like-math-max-in-ne]

But it might be quite useful and a common operation in JVM languages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Split PyFlink packages into two packages: apache-flink and apache-flink-libraries

2021-03-22 Thread Till Rohrmann
If there is no other way, then I would say let's go with splitting the
modules. This is already better than keeping the Flink binaries bundled
with every Python/platform package.

Cheers,
Till

On Mon, Mar 22, 2021 at 8:28 AM Xingbo Huang  wrote:

> When we **pip install** a wheel package, it just unpacks the wheel package
> and installs its dependencies[1]. There is no way to download things from
> an external website during installation. It works differently from the
> source package where we could download something in the setup.py. This is
> explained in detail in [2]. So I'm afraid that splitting the package is the
> only solution we have if we want to reduce the package size of pyflink.
>
> [1] https://www.python.org/dev/peps/pep-0427/
> [2] https://realpython.com/python-wheels/#advantages-of-python-wheels
>
> Best,
> Xingbo
>
> Till Rohrmann  于2021年3月19日周五 下午6:32写道:
>
> > I think that we should try to reduce the size of the packages by either
> > splitting them or by having another means to retrieve the Java binaries.
> >
> > Cheers,
> > Till
> >
> > On Fri, Mar 19, 2021 at 2:58 AM Xingbo Huang  wrote:
> >
> > > Hi Till,
> > >
> > > The package size of tensorflow[1] is also very big(about 300MB+).
> > However,
> > > it does not try to solve the problem, but expands the space limit in
> PyPI
> > > frequently whenever the project space is full. We could also choose
> this
> > > option. According to our current release frequency, we probably need to
> > > apply for 15GB expansion every year. There are not too many similar
> > cases,
> > > so there is also no standard solution to refer to. But the behavior of
> > > splitting a project into multiple packages is quite common. For
> example,
> > > apache airflow will prepare a corresponding release package for each
> > > provider[2].
> > >
> > > So I think there are currently two solutions in my mind which could
> work.
> > >
> > > 1. Just keep the current solution and expand the space limit in PyPI
> > > whenever the space is full.
> > >
> > > 2. Split into two packages to reduce the wheel package size.
> > >
> > > [1] https://pypi.org/project/tensorflow/#files
> > > [2] https://pypi.org/search/?q=apache-airflow-*=
> > >
> > > Best,
> > > Xingbo
> > >
> > > Till Rohrmann  于2021年3月17日周三 下午9:22写道:
> > >
> > > > How do other projects solve this problem?
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Mar 17, 2021 at 3:45 AM Xingbo Huang 
> > wrote:
> > > >
> > > > > Hi Chesnay,
> > > > >
> > > > > Yes, in most cases, we can indeed download the required jars in
> > > > `setup.py`,
> > > > > which is also the solution I originally thought of reducing the
> size
> > of
> > > > > wheel packages. However, I'm afraid that it will not work in
> > scenarios
> > > > when
> > > > > accessing the external network is not possible which is very common
> > in
> > > > the
> > > > > production cluster.
> > > > >
> > > > > Best,
> > > > > Xingbo
> > > > >
> > > > > Chesnay Schepler  于2021年3月16日周二 下午8:32写道:
> > > > >
> > > > > > This proposed apache-flink-libraries package would just contain
> the
> > > > > > binary, right? And effectively be unusable to the python audience
> > on
> > > > > > it's own.
> > > > > >
> > > > > > Essentially we are just abusing Pypi for shipping a java binary.
> Is
> > > > > > there no way for us to download the jars when the python package
> is
> > > > > > being installed? (e.g., in setup.py)
> > > > > >
> > > > > > On 3/16/2021 1:23 PM, Dian Fu wrote:
> > > > > > > Yes, the size of .whl file in PyFlink will also be about 3MB if
> > we
> > > > > split
> > > > > > the package. Currently the package is big because we bundled the
> > jar
> > > > > files
> > > > > > in it.
> > > > > > >
> > > > > > >> 2021年3月16日 下午8:13,Chesnay Schepler  写道:
> > > > > > >>
> > > > > > >> key difference being that the beam .whl files are 3mb large,
> aka
> > > 60x
> > > > > > smaller.
> > > > > > >>
> > > > > > >> On 3/16/2021 1:06 PM, Dian Fu wrote:
> > > > > > >>> Hi Chesnay,
> > > > > > >>>
> > > > > > >>> We will publish binary packages separately for:
> > > > > > >>> 1) Python 3.5 / 3.6 / 3.7 / 3.8 (since 1.12) separately
> > > > > > >>> 2) Linux / Mac separately
> > > > > > >>>
> > > > > > >>> Besides, there is also a source package which is used when
> none
> > > of
> > > > > the
> > > > > > above binary packages is usable, e.g. for Window users.
> > > > > > >>>
> > > > > > >>> PS: publishing multiple binary packages is very common in
> > Python
> > > > > > world, e.g. Beam published 22 packages in 2.28, Pandas published
> 16
> > > > > > packages in 1.2.3 [2]. We could also publishing more packages if
> we
> > > > > > splitting the packages as the cost of adding another package will
> > be
> > > > very
> > > > > > small.
> > > > > > >>>
> > > > > > >>> Regards,
> > > > > > >>> Dian
> > > > > > >>>
> > > > > > >>> [1] https://pypi.org/project/apache-beam/#files <
> > > > > > https://pypi.org/project/apache-beam/#files> <
> > > > > > 

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-22 Thread Stephan Ewen
Hi Yingjie!

Thanks for doing those experiments, the results look good. Let's go ahead
with 32M then.

Regarding the key, I am not strongly opinionated there. There are arguments
for both keys, (1) making the key part of the network pool config as you
did here or (2) making it part of the TM config (relative to framework
off-heap memory). I find (1) quite understandable, but it is personal
taste, so I can go with either option.

Best,
Stephan


On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧)  wrote:

> Hi all,
>
> I have tested the default memory size with both batch (tpcds) and
> streaming jobs running in one session cluster (more than 100 queries). The
> result is:
> 1. All settings (16M, 32M and 64M) can work well without any OOM.
> 2. For streaming jobs running after batch jobs, there is no performance or
> stability regression.
> 2. 32M and 64M is better (over 10%) in terms of performance for the test
> batch job on HDD.
>
> Based on the above results, I think 32M is a good default choice, because
> the performance is good enough for the test job and compared to 64M, more
> direct memory can be used by netty and other components. What do you think?
>
> BTW, about the configuration key, do we reach a consensus? I am
> temporarily using taskmanager.memory.network.batch-shuffle-read.size in
> my PR now. Any suggestions about that?
>
> Best,
> Yingjie (Kevin)
>
> --
> 发件人:Guowei Ma
> 日 期:2021年03月09日 17:28:35
> 收件人:曹英杰(北牧)
> 抄 送:Till Rohrmann; Stephan Ewen;
> dev; user; Xintong Song<
> tonysong...@gmail.com>
> 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM
> merge shuffle
>
> Hi, all
>
> Thanks all for your suggestions and feedback.
> I think it is a good idea that we increase the default size of the
> separated pool by testing. I am fine with adding the suffix(".size") to the
> config name, which makes it more clear to the user.
> But I am a little worried about adding a prefix("framework") because
> currently the tm shuffle service is only a shuffle-plugin, which is not a
> part of the framework. So maybe we could add a clear explanation in the
> document?
>
> Best,
> Guowei
>
>
> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧)  wrote:
>
>> Thanks for the suggestions. I will do some tests and share the results
>> after the implementation is ready. Then we can give a proper default value.
>>
>> Best,
>> Yingjie
>>
>> --
>> 发件人:Till Rohrmann
>> 日 期:2021年03月05日 23:03:10
>> 收件人:Stephan Ewen
>> 抄 送:dev; user; Xintong Song<
>> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma<
>> guowei@gmail.com>
>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>> shuffle
>>
>> Thanks for this proposal Guowei. +1 for it.
>>
>> Concerning the default size, maybe we can run some experiments and see
>> how the system behaves with different pool sizes.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen  wrote:
>>
>>> Thanks Guowei, for the proposal.
>>>
>>> As discussed offline already, I think this sounds good.
>>>
>>> One thought is that 16m sounds very small for a default read buffer
>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:
>>>
 Hi, all


 In the Flink 1.12 we introduce the TM merge shuffle. But the
 out-of-the-box experience of using TM merge shuffle is not very good. The
 main reason is that the default configuration always makes users encounter
 OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
 to avoid the problem.
 Goals

1. Don't affect the streaming and pipelined-shuffle-only batch
setups.
2. Don't mix memory with different life cycle in the same pool.
E.g., write buffers needed by running tasks and read buffer needed even
after tasks being finished.
3. User can use the TM merge shuffle with default memory
configurations. (May need further tunings for performance optimization, 
 but
should not fail with the default configurations.)

 Proposal

1. Introduce a configuration
`taskmanager.memory.network.batch-read` to specify the size of this 
 memory
pool. The default value is 16m.
2. Allocate the pool lazily. It means that the memory pool would be
allocated when the TM merge shuffle is used at the first time.
3. This pool size will not be add up to the TM's total memory size,
but will be considered part of
`taskmanager.memory.framework.off-heap.size`. We need to check that the
pool size is not larger than the framework off-heap size, if TM merge
shuffle is enabled.


 In this default configuration, the allocation of the memory pool is
 almost impossible to fail. Currently the default 

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-22 Thread Till Rohrmann
Thanks for the update Yingjie. Then let's go with 32 MB I would say.

Concerning the name of the configuration option I see Xintong's point. If
the batch shuffle is subtracted from
`taskmanager.memory.framework.off-heap.size` because it is part of the
off-heap pool, then something like
`taskmanager.memory.framework.off-heap.batch-shuffle.size` would better
reflect this situation. On the other hand, this is quite a long
configuration name. But it is also a quite advanced configuration option
which, hopefully, should not be touched by too many of our users.

Cheers,
Till

On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧)  wrote:

> Hi all,
>
> I have tested the default memory size with both batch (tpcds) and
> streaming jobs running in one session cluster (more than 100 queries). The
> result is:
> 1. All settings (16M, 32M and 64M) can work well without any OOM.
> 2. For streaming jobs running after batch jobs, there is no performance or
> stability regression.
> 2. 32M and 64M is better (over 10%) in terms of performance for the test
> batch job on HDD.
>
> Based on the above results, I think 32M is a good default choice, because
> the performance is good enough for the test job and compared to 64M, more
> direct memory can be used by netty and other components. What do you think?
>
> BTW, about the configuration key, do we reach a consensus? I am
> temporarily using taskmanager.memory.network.batch-shuffle-read.size in
> my PR now. Any suggestions about that?
>
> Best,
> Yingjie (Kevin)
>
> --
> 发件人:Guowei Ma
> 日 期:2021年03月09日 17:28:35
> 收件人:曹英杰(北牧)
> 抄 送:Till Rohrmann; Stephan Ewen;
> dev; user; Xintong Song<
> tonysong...@gmail.com>
> 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM
> merge shuffle
>
> Hi, all
>
> Thanks all for your suggestions and feedback.
> I think it is a good idea that we increase the default size of the
> separated pool by testing. I am fine with adding the suffix(".size") to the
> config name, which makes it more clear to the user.
> But I am a little worried about adding a prefix("framework") because
> currently the tm shuffle service is only a shuffle-plugin, which is not a
> part of the framework. So maybe we could add a clear explanation in the
> document?
>
> Best,
> Guowei
>
>
> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧)  wrote:
>
>> Thanks for the suggestions. I will do some tests and share the results
>> after the implementation is ready. Then we can give a proper default value.
>>
>> Best,
>> Yingjie
>>
>> --
>> 发件人:Till Rohrmann
>> 日 期:2021年03月05日 23:03:10
>> 收件人:Stephan Ewen
>> 抄 送:dev; user; Xintong Song<
>> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma<
>> guowei@gmail.com>
>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>> shuffle
>>
>> Thanks for this proposal Guowei. +1 for it.
>>
>> Concerning the default size, maybe we can run some experiments and see
>> how the system behaves with different pool sizes.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen  wrote:
>>
>>> Thanks Guowei, for the proposal.
>>>
>>> As discussed offline already, I think this sounds good.
>>>
>>> One thought is that 16m sounds very small for a default read buffer
>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:
>>>
 Hi, all


 In the Flink 1.12 we introduce the TM merge shuffle. But the
 out-of-the-box experience of using TM merge shuffle is not very good. The
 main reason is that the default configuration always makes users encounter
 OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
 to avoid the problem.
 Goals

1. Don't affect the streaming and pipelined-shuffle-only batch
setups.
2. Don't mix memory with different life cycle in the same pool.
E.g., write buffers needed by running tasks and read buffer needed even
after tasks being finished.
3. User can use the TM merge shuffle with default memory
configurations. (May need further tunings for performance optimization, 
 but
should not fail with the default configurations.)

 Proposal

1. Introduce a configuration
`taskmanager.memory.network.batch-read` to specify the size of this 
 memory
pool. The default value is 16m.
2. Allocate the pool lazily. It means that the memory pool would be
allocated when the TM merge shuffle is used at the first time.
3. This pool size will not be add up to the TM's total memory size,
but will be considered part of
`taskmanager.memory.framework.off-heap.size`. We need to check that the
pool size is not larger than the framework off-heap size, if TM merge
shuffle is enabled.


 In 

[jira] [Created] (FLINK-21910) Wrong Checks whether the service has been started.

2021-03-22 Thread pengkangjing (Jira)
pengkangjing created FLINK-21910:


 Summary: Wrong Checks whether the service has been started.
 Key: FLINK-21910
 URL: https://issues.apache.org/jira/browse/FLINK-21910
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: pengkangjing






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Split PyFlink packages into two packages: apache-flink and apache-flink-libraries

2021-03-22 Thread Xingbo Huang
When we **pip install** a wheel package, it just unpacks the wheel package
and installs its dependencies[1]. There is no way to download things from
an external website during installation. It works differently from the
source package where we could download something in the setup.py. This is
explained in detail in [2]. So I'm afraid that splitting the package is the
only solution we have if we want to reduce the package size of pyflink.

[1] https://www.python.org/dev/peps/pep-0427/
[2] https://realpython.com/python-wheels/#advantages-of-python-wheels

Best,
Xingbo

Till Rohrmann  于2021年3月19日周五 下午6:32写道:

> I think that we should try to reduce the size of the packages by either
> splitting them or by having another means to retrieve the Java binaries.
>
> Cheers,
> Till
>
> On Fri, Mar 19, 2021 at 2:58 AM Xingbo Huang  wrote:
>
> > Hi Till,
> >
> > The package size of tensorflow[1] is also very big(about 300MB+).
> However,
> > it does not try to solve the problem, but expands the space limit in PyPI
> > frequently whenever the project space is full. We could also choose this
> > option. According to our current release frequency, we probably need to
> > apply for 15GB expansion every year. There are not too many similar
> cases,
> > so there is also no standard solution to refer to. But the behavior of
> > splitting a project into multiple packages is quite common. For example,
> > apache airflow will prepare a corresponding release package for each
> > provider[2].
> >
> > So I think there are currently two solutions in my mind which could work.
> >
> > 1. Just keep the current solution and expand the space limit in PyPI
> > whenever the space is full.
> >
> > 2. Split into two packages to reduce the wheel package size.
> >
> > [1] https://pypi.org/project/tensorflow/#files
> > [2] https://pypi.org/search/?q=apache-airflow-*=
> >
> > Best,
> > Xingbo
> >
> > Till Rohrmann  于2021年3月17日周三 下午9:22写道:
> >
> > > How do other projects solve this problem?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Mar 17, 2021 at 3:45 AM Xingbo Huang 
> wrote:
> > >
> > > > Hi Chesnay,
> > > >
> > > > Yes, in most cases, we can indeed download the required jars in
> > > `setup.py`,
> > > > which is also the solution I originally thought of reducing the size
> of
> > > > wheel packages. However, I'm afraid that it will not work in
> scenarios
> > > when
> > > > accessing the external network is not possible which is very common
> in
> > > the
> > > > production cluster.
> > > >
> > > > Best,
> > > > Xingbo
> > > >
> > > > Chesnay Schepler  于2021年3月16日周二 下午8:32写道:
> > > >
> > > > > This proposed apache-flink-libraries package would just contain the
> > > > > binary, right? And effectively be unusable to the python audience
> on
> > > > > it's own.
> > > > >
> > > > > Essentially we are just abusing Pypi for shipping a java binary. Is
> > > > > there no way for us to download the jars when the python package is
> > > > > being installed? (e.g., in setup.py)
> > > > >
> > > > > On 3/16/2021 1:23 PM, Dian Fu wrote:
> > > > > > Yes, the size of .whl file in PyFlink will also be about 3MB if
> we
> > > > split
> > > > > the package. Currently the package is big because we bundled the
> jar
> > > > files
> > > > > in it.
> > > > > >
> > > > > >> 2021年3月16日 下午8:13,Chesnay Schepler  写道:
> > > > > >>
> > > > > >> key difference being that the beam .whl files are 3mb large, aka
> > 60x
> > > > > smaller.
> > > > > >>
> > > > > >> On 3/16/2021 1:06 PM, Dian Fu wrote:
> > > > > >>> Hi Chesnay,
> > > > > >>>
> > > > > >>> We will publish binary packages separately for:
> > > > > >>> 1) Python 3.5 / 3.6 / 3.7 / 3.8 (since 1.12) separately
> > > > > >>> 2) Linux / Mac separately
> > > > > >>>
> > > > > >>> Besides, there is also a source package which is used when none
> > of
> > > > the
> > > > > above binary packages is usable, e.g. for Window users.
> > > > > >>>
> > > > > >>> PS: publishing multiple binary packages is very common in
> Python
> > > > > world, e.g. Beam published 22 packages in 2.28, Pandas published 16
> > > > > packages in 1.2.3 [2]. We could also publishing more packages if we
> > > > > splitting the packages as the cost of adding another package will
> be
> > > very
> > > > > small.
> > > > > >>>
> > > > > >>> Regards,
> > > > > >>> Dian
> > > > > >>>
> > > > > >>> [1] https://pypi.org/project/apache-beam/#files <
> > > > > https://pypi.org/project/apache-beam/#files> <
> > > > > https://pypi.org/project/apache-beam/#files <
> > > > > https://pypi.org/project/apache-beam/#files>>
> > > > > >>> [2] https://pypi.org/project/pandas/#files
> > > > > >>>
> > > > > >>>
> > > > > >>> Hi Xintong,
> > > > > >>>
> > > > > >>> Yes, you are right that there is 9 packages in 1.12 as we added
> > > > Python
> > > > > 3.8 support in 1.12.
> > > > > >>>
> > > > > >>> Regards,
> > > > > >>> Dian
> > > > > >>>
> > > > >  2021年3月16日 下午7:45,Xintong Song  写道:
> > > > > 
> > > > >  And it's not only uploaded to PyPI, but