[jira] [Created] (FLINK-21920) Optimize DefaultScheduler#allocateSlots
Zhilong Hong created FLINK-21920: Summary: Optimize DefaultScheduler#allocateSlots Key: FLINK-21920 URL: https://issues.apache.org/jira/browse/FLINK-21920 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 Based on the scheduler benchmark introduced in FLINK-21731, we find that there are several procedures related to {{DefaultScheduler#allocateSlots}} have O(N^2) complexity. The first one is: {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}. The original implementation is: {code:java} for all SchedulingExecutionVertex in DefaultScheduler: for all consumed SchedulingResultPartition of the SchedulingExecutionVertex: get the result partition's producer vertex and determine the ExecutionSlotSharingGroup where the producer vertex locates is available for current vertex{code} This procedure has O(N^2) complexity. It's obvious that the result partitions in the same ConsumedPartitionGroup have the same producer vertex. So we can just iterate over the ConsumedPartitionGroups instead of all the consumed partitions. This will decrease the complexity from O(N^2) to O(N). The second one is: {{ExecutionGraphToInputsLocationsRetrieverAdapter#getConsumedResultPartitionsProducers}}. The original implementation is: {code:java} for all SchedulingExecutionVertex in DefaultScheduler: for all ConsumedPartitionGroup of the SchedulingExecutionVertex: for all IntermediateResultPartition in the ConsumedPartitionGroup: get producer of the IntermediateResultPartition {code} This procedure has O(N^2) complexity. We can see that for each SchedulingExecutionVertex, the producers of its ConsumedPartitionGroup is calculated separately. For SchedulingExecutionVertices in the same ConsumerVertexGroup, they have the same ConsumedPartitionGroup. Thus, we don't need to calculate the producers over and over again. We can use a local cache to cache the producers. This will decrease the complexity from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21919) JsonPlanTestBase should extends AbstractTestBase
Jingsong Lee created FLINK-21919: Summary: JsonPlanTestBase should extends AbstractTestBase Key: FLINK-21919 URL: https://issues.apache.org/jira/browse/FLINK-21919 Project: Flink Issue Type: Improvement Reporter: Jingsong Lee Fix For: 1.13.0 If we don't inheritAbstractTestBase, it may be unstable in different environments. Our it cases should inherit it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21918) Add execution.runtime-mode setter in StreamExecutionEnvironment
Dian Fu created FLINK-21918: --- Summary: Add execution.runtime-mode setter in StreamExecutionEnvironment Key: FLINK-21918 URL: https://issues.apache.org/jira/browse/FLINK-21918 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21917) Add back missing zh docs
Leonard Xu created FLINK-21917: -- Summary: Add back missing zh docs Key: FLINK-21917 URL: https://issues.apache.org/jira/browse/FLINK-21917 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.13.0 Reporter: Leonard Xu Fix For: 1.13.0 We lost some Chinese doc pages are lost when migrate from legacy doc website to current one in (https://github.com/apache/flink/pull/14903), such as `jdbc zh`, `kafka zh`. we should add them back. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Split PyFlink packages into two packages: apache-flink and apache-flink-libraries
Thanks for the feedback everyone. I will proceed if there is no objection. Best, Xingbo Till Rohrmann 于2021年3月22日周一 下午5:30写道: > If there is no other way, then I would say let's go with splitting the > modules. This is already better than keeping the Flink binaries bundled > with every Python/platform package. > > Cheers, > Till > > On Mon, Mar 22, 2021 at 8:28 AM Xingbo Huang wrote: > > > When we **pip install** a wheel package, it just unpacks the wheel > package > > and installs its dependencies[1]. There is no way to download things from > > an external website during installation. It works differently from the > > source package where we could download something in the setup.py. This is > > explained in detail in [2]. So I'm afraid that splitting the package is > the > > only solution we have if we want to reduce the package size of pyflink. > > > > [1] https://www.python.org/dev/peps/pep-0427/ > > [2] https://realpython.com/python-wheels/#advantages-of-python-wheels > > > > Best, > > Xingbo > > > > Till Rohrmann 于2021年3月19日周五 下午6:32写道: > > > > > I think that we should try to reduce the size of the packages by either > > > splitting them or by having another means to retrieve the Java > binaries. > > > > > > Cheers, > > > Till > > > > > > On Fri, Mar 19, 2021 at 2:58 AM Xingbo Huang > wrote: > > > > > > > Hi Till, > > > > > > > > The package size of tensorflow[1] is also very big(about 300MB+). > > > However, > > > > it does not try to solve the problem, but expands the space limit in > > PyPI > > > > frequently whenever the project space is full. We could also choose > > this > > > > option. According to our current release frequency, we probably need > to > > > > apply for 15GB expansion every year. There are not too many similar > > > cases, > > > > so there is also no standard solution to refer to. But the behavior > of > > > > splitting a project into multiple packages is quite common. For > > example, > > > > apache airflow will prepare a corresponding release package for each > > > > provider[2]. > > > > > > > > So I think there are currently two solutions in my mind which could > > work. > > > > > > > > 1. Just keep the current solution and expand the space limit in PyPI > > > > whenever the space is full. > > > > > > > > 2. Split into two packages to reduce the wheel package size. > > > > > > > > [1] https://pypi.org/project/tensorflow/#files > > > > [2] https://pypi.org/search/?q=apache-airflow-*= > > > > > > > > Best, > > > > Xingbo > > > > > > > > Till Rohrmann 于2021年3月17日周三 下午9:22写道: > > > > > > > > > How do other projects solve this problem? > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Wed, Mar 17, 2021 at 3:45 AM Xingbo Huang > > > wrote: > > > > > > > > > > > Hi Chesnay, > > > > > > > > > > > > Yes, in most cases, we can indeed download the required jars in > > > > > `setup.py`, > > > > > > which is also the solution I originally thought of reducing the > > size > > > of > > > > > > wheel packages. However, I'm afraid that it will not work in > > > scenarios > > > > > when > > > > > > accessing the external network is not possible which is very > common > > > in > > > > > the > > > > > > production cluster. > > > > > > > > > > > > Best, > > > > > > Xingbo > > > > > > > > > > > > Chesnay Schepler 于2021年3月16日周二 下午8:32写道: > > > > > > > > > > > > > This proposed apache-flink-libraries package would just contain > > the > > > > > > > binary, right? And effectively be unusable to the python > audience > > > on > > > > > > > it's own. > > > > > > > > > > > > > > Essentially we are just abusing Pypi for shipping a java > binary. > > Is > > > > > > > there no way for us to download the jars when the python > package > > is > > > > > > > being installed? (e.g., in setup.py) > > > > > > > > > > > > > > On 3/16/2021 1:23 PM, Dian Fu wrote: > > > > > > > > Yes, the size of .whl file in PyFlink will also be about 3MB > if > > > we > > > > > > split > > > > > > > the package. Currently the package is big because we bundled > the > > > jar > > > > > > files > > > > > > > in it. > > > > > > > > > > > > > > > >> 2021年3月16日 下午8:13,Chesnay Schepler 写道: > > > > > > > >> > > > > > > > >> key difference being that the beam .whl files are 3mb large, > > aka > > > > 60x > > > > > > > smaller. > > > > > > > >> > > > > > > > >> On 3/16/2021 1:06 PM, Dian Fu wrote: > > > > > > > >>> Hi Chesnay, > > > > > > > >>> > > > > > > > >>> We will publish binary packages separately for: > > > > > > > >>> 1) Python 3.5 / 3.6 / 3.7 / 3.8 (since 1.12) separately > > > > > > > >>> 2) Linux / Mac separately > > > > > > > >>> > > > > > > > >>> Besides, there is also a source package which is used when > > none > > > > of > > > > > > the > > > > > > > above binary packages is usable, e.g. for Window users. > > > > > > > >>> > > > > > > > >>> PS: publishing multiple binary packages is very common in > > > Python > > > > > > > world, e.g. Beam published 22 packages in 2.28, Pandas > published > > 16
[jira] [Created] (FLINK-21916) Allows multiple kinds of ManagedMemoryUseCase for the same operator
Dian Fu created FLINK-21916: --- Summary: Allows multiple kinds of ManagedMemoryUseCase for the same operator Key: FLINK-21916 URL: https://issues.apache.org/jira/browse/FLINK-21916 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.13.0 Currently, the implementation of [SimpleTransformationTranslator|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java#L119] restricts that OPERATOR and PYTHON use case could not be used in the same operator. For batch mode of Python DataStream API: - OPERATOR use case will be added firstly in BatchExecutionUtils.applyBatchExecutionSettings for a keyed operator during translation - Then the PYTHON use case which is set in the transformation could not be added in [SimpleTransformationTranslator|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java#L119] as the streamNode.getManagedMemoryOperatorScopeUseCaseWeights() is not empty. We need to remove the restriction in [SimpleTransformationTranslator|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java#L119] if there are no special reasons. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21915) Optimize Execution#finishPartitionsAndUpdateConsumers
Zhilong Hong created FLINK-21915: Summary: Optimize Execution#finishPartitionsAndUpdateConsumers Key: FLINK-21915 URL: https://issues.apache.org/jira/browse/FLINK-21915 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 Based on the scheduler benchmark {{PartitionReleaseInBatchJobBenchmark}} introduced in FLINK-20612, we find that there's another procedure that has O(N^2) computation complexity: {{Execution#finishPartitionsAndUpdateConsumers}}. Once an execution is finished, it will finish all its BLOCKING partitions and update the partition info to all consumer vertices. The procedure can be illustrated as the following pseudo code: {code:java} for all Execution in ExecutionGraph: for all produced IntermediateResultPartition of the Execution: for all consumer ExecutionVertex of the IntermediateResultPartition: update or cache partition info{code} This procedure has O(N^2) complexity in total. Based on FLINK-21326, the consumed partitions are grouped if they are connected to the same consumer vertices. Therefore, we can update partition info of the entire ConsumedPartitionGroup in batch, rather than one by one. This will decrease the complexity from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Move Flink ML pipeline API and library code to a separate repository named flink-ml
+1 Regards, Dian On Fri, Mar 19, 2021 at 6:39 PM Till Rohrmann wrote: > +1 > > Cheers, > Till > > On Fri, Mar 19, 2021 at 9:02 AM Becket Qin wrote: > > > +1. Thanks for driving the effort, Dong. > > > > Jiangjie (Becket) Qin > > > > On Fri, Mar 19, 2021 at 10:07 AM Dong Lin wrote: > > > > > Hi all, > > > > > > I would like to start the vote for moving Flink ML pipeline API and > > library > > > code to a separate repository named flink-ml. > > > > > > The main motivation is to allow us to remove SQL planner in Flink 1.14 > > > while still allowing ML pipeline API/library development in the coming > > > year, > > > > > > The vote will last for at least 72 hours, following the consensus > voting > > > process. > > > > > > Discussion thread: > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Flink-ML-pipeline-API-and-library-code-to-a-separate-repository-named-flink-ml-tc49420.html > > > > > > > > > Thanks! > > > Dong > > > > > >
[jira] [Created] (FLINK-21914) Trying to access closed classloader
Spongebob created FLINK-21914: - Summary: Trying to access closed classloader Key: FLINK-21914 URL: https://issues.apache.org/jira/browse/FLINK-21914 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.12.2 Environment: flink: 1.12.2 hadoop: 3.1.3 hive: 3.1.2 Reporter: Spongebob Attachments: app.log I am trying to deploy flink application on yarn, but got this exception: Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. This application tested pass on my local environment. And the application detail is read and write into hive via flink table environment. you can view attachment for yarn log which source and sink data info was deleted -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21913) Update DynamicTableFactory.Context to use ResolvedCatalogTable
Timo Walther created FLINK-21913: Summary: Update DynamicTableFactory.Context to use ResolvedCatalogTable Key: FLINK-21913 URL: https://issues.apache.org/jira/browse/FLINK-21913 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther Assignee: Timo Walther Update DynamicTableFactory.Context to use ResolvedCatalogTable. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21912) Introduce Schema and ResolvedSchema in Python API
Timo Walther created FLINK-21912: Summary: Introduce Schema and ResolvedSchema in Python API Key: FLINK-21912 URL: https://issues.apache.org/jira/browse/FLINK-21912 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Timo Walther The changes of FLIP-164 should also be applied to the Python API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21911) Support arithmetic MIN/MAX in SQL
Timo Walther created FLINK-21911: Summary: Support arithmetic MIN/MAX in SQL Key: FLINK-21911 URL: https://issues.apache.org/jira/browse/FLINK-21911 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther We should discuss if we want to support math MIN / MAX in Flink SQL. It seems also other vendors do not support it out of the box: [https://stackoverflow.com/questions/124417/is-there-a-max-function-in-sql-server-that-takes-two-values-like-math-max-in-ne] But it might be quite useful and a common operation in JVM languages. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Split PyFlink packages into two packages: apache-flink and apache-flink-libraries
If there is no other way, then I would say let's go with splitting the modules. This is already better than keeping the Flink binaries bundled with every Python/platform package. Cheers, Till On Mon, Mar 22, 2021 at 8:28 AM Xingbo Huang wrote: > When we **pip install** a wheel package, it just unpacks the wheel package > and installs its dependencies[1]. There is no way to download things from > an external website during installation. It works differently from the > source package where we could download something in the setup.py. This is > explained in detail in [2]. So I'm afraid that splitting the package is the > only solution we have if we want to reduce the package size of pyflink. > > [1] https://www.python.org/dev/peps/pep-0427/ > [2] https://realpython.com/python-wheels/#advantages-of-python-wheels > > Best, > Xingbo > > Till Rohrmann 于2021年3月19日周五 下午6:32写道: > > > I think that we should try to reduce the size of the packages by either > > splitting them or by having another means to retrieve the Java binaries. > > > > Cheers, > > Till > > > > On Fri, Mar 19, 2021 at 2:58 AM Xingbo Huang wrote: > > > > > Hi Till, > > > > > > The package size of tensorflow[1] is also very big(about 300MB+). > > However, > > > it does not try to solve the problem, but expands the space limit in > PyPI > > > frequently whenever the project space is full. We could also choose > this > > > option. According to our current release frequency, we probably need to > > > apply for 15GB expansion every year. There are not too many similar > > cases, > > > so there is also no standard solution to refer to. But the behavior of > > > splitting a project into multiple packages is quite common. For > example, > > > apache airflow will prepare a corresponding release package for each > > > provider[2]. > > > > > > So I think there are currently two solutions in my mind which could > work. > > > > > > 1. Just keep the current solution and expand the space limit in PyPI > > > whenever the space is full. > > > > > > 2. Split into two packages to reduce the wheel package size. > > > > > > [1] https://pypi.org/project/tensorflow/#files > > > [2] https://pypi.org/search/?q=apache-airflow-*= > > > > > > Best, > > > Xingbo > > > > > > Till Rohrmann 于2021年3月17日周三 下午9:22写道: > > > > > > > How do other projects solve this problem? > > > > > > > > Cheers, > > > > Till > > > > > > > > On Wed, Mar 17, 2021 at 3:45 AM Xingbo Huang > > wrote: > > > > > > > > > Hi Chesnay, > > > > > > > > > > Yes, in most cases, we can indeed download the required jars in > > > > `setup.py`, > > > > > which is also the solution I originally thought of reducing the > size > > of > > > > > wheel packages. However, I'm afraid that it will not work in > > scenarios > > > > when > > > > > accessing the external network is not possible which is very common > > in > > > > the > > > > > production cluster. > > > > > > > > > > Best, > > > > > Xingbo > > > > > > > > > > Chesnay Schepler 于2021年3月16日周二 下午8:32写道: > > > > > > > > > > > This proposed apache-flink-libraries package would just contain > the > > > > > > binary, right? And effectively be unusable to the python audience > > on > > > > > > it's own. > > > > > > > > > > > > Essentially we are just abusing Pypi for shipping a java binary. > Is > > > > > > there no way for us to download the jars when the python package > is > > > > > > being installed? (e.g., in setup.py) > > > > > > > > > > > > On 3/16/2021 1:23 PM, Dian Fu wrote: > > > > > > > Yes, the size of .whl file in PyFlink will also be about 3MB if > > we > > > > > split > > > > > > the package. Currently the package is big because we bundled the > > jar > > > > > files > > > > > > in it. > > > > > > > > > > > > > >> 2021年3月16日 下午8:13,Chesnay Schepler 写道: > > > > > > >> > > > > > > >> key difference being that the beam .whl files are 3mb large, > aka > > > 60x > > > > > > smaller. > > > > > > >> > > > > > > >> On 3/16/2021 1:06 PM, Dian Fu wrote: > > > > > > >>> Hi Chesnay, > > > > > > >>> > > > > > > >>> We will publish binary packages separately for: > > > > > > >>> 1) Python 3.5 / 3.6 / 3.7 / 3.8 (since 1.12) separately > > > > > > >>> 2) Linux / Mac separately > > > > > > >>> > > > > > > >>> Besides, there is also a source package which is used when > none > > > of > > > > > the > > > > > > above binary packages is usable, e.g. for Window users. > > > > > > >>> > > > > > > >>> PS: publishing multiple binary packages is very common in > > Python > > > > > > world, e.g. Beam published 22 packages in 2.28, Pandas published > 16 > > > > > > packages in 1.2.3 [2]. We could also publishing more packages if > we > > > > > > splitting the packages as the cost of adding another package will > > be > > > > very > > > > > > small. > > > > > > >>> > > > > > > >>> Regards, > > > > > > >>> Dian > > > > > > >>> > > > > > > >>> [1] https://pypi.org/project/apache-beam/#files < > > > > > > https://pypi.org/project/apache-beam/#files> < > > > > > >
Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle
Hi Yingjie! Thanks for doing those experiments, the results look good. Let's go ahead with 32M then. Regarding the key, I am not strongly opinionated there. There are arguments for both keys, (1) making the key part of the network pool config as you did here or (2) making it part of the TM config (relative to framework off-heap memory). I find (1) quite understandable, but it is personal taste, so I can go with either option. Best, Stephan On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) wrote: > Hi all, > > I have tested the default memory size with both batch (tpcds) and > streaming jobs running in one session cluster (more than 100 queries). The > result is: > 1. All settings (16M, 32M and 64M) can work well without any OOM. > 2. For streaming jobs running after batch jobs, there is no performance or > stability regression. > 2. 32M and 64M is better (over 10%) in terms of performance for the test > batch job on HDD. > > Based on the above results, I think 32M is a good default choice, because > the performance is good enough for the test job and compared to 64M, more > direct memory can be used by netty and other components. What do you think? > > BTW, about the configuration key, do we reach a consensus? I am > temporarily using taskmanager.memory.network.batch-shuffle-read.size in > my PR now. Any suggestions about that? > > Best, > Yingjie (Kevin) > > -- > 发件人:Guowei Ma > 日 期:2021年03月09日 17:28:35 > 收件人:曹英杰(北牧) > 抄 送:Till Rohrmann; Stephan Ewen; > dev; user; Xintong Song< > tonysong...@gmail.com> > 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM > merge shuffle > > Hi, all > > Thanks all for your suggestions and feedback. > I think it is a good idea that we increase the default size of the > separated pool by testing. I am fine with adding the suffix(".size") to the > config name, which makes it more clear to the user. > But I am a little worried about adding a prefix("framework") because > currently the tm shuffle service is only a shuffle-plugin, which is not a > part of the framework. So maybe we could add a clear explanation in the > document? > > Best, > Guowei > > > On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) wrote: > >> Thanks for the suggestions. I will do some tests and share the results >> after the implementation is ready. Then we can give a proper default value. >> >> Best, >> Yingjie >> >> -- >> 发件人:Till Rohrmann >> 日 期:2021年03月05日 23:03:10 >> 收件人:Stephan Ewen >> 抄 送:dev; user; Xintong Song< >> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma< >> guowei@gmail.com> >> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge >> shuffle >> >> Thanks for this proposal Guowei. +1 for it. >> >> Concerning the default size, maybe we can run some experiments and see >> how the system behaves with different pool sizes. >> >> Cheers, >> Till >> >> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen wrote: >> >>> Thanks Guowei, for the proposal. >>> >>> As discussed offline already, I think this sounds good. >>> >>> One thought is that 16m sounds very small for a default read buffer >>> pool. How risky do you think it is to increase this to 32m or 64m? >>> >>> Best, >>> Stephan >>> >>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma wrote: >>> Hi, all In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem. Goals 1. Don't affect the streaming and pipelined-shuffle-only batch setups. 2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished. 3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.) Proposal 1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m. 2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time. 3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled. In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default
Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle
Thanks for the update Yingjie. Then let's go with 32 MB I would say. Concerning the name of the configuration option I see Xintong's point. If the batch shuffle is subtracted from `taskmanager.memory.framework.off-heap.size` because it is part of the off-heap pool, then something like `taskmanager.memory.framework.off-heap.batch-shuffle.size` would better reflect this situation. On the other hand, this is quite a long configuration name. But it is also a quite advanced configuration option which, hopefully, should not be touched by too many of our users. Cheers, Till On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) wrote: > Hi all, > > I have tested the default memory size with both batch (tpcds) and > streaming jobs running in one session cluster (more than 100 queries). The > result is: > 1. All settings (16M, 32M and 64M) can work well without any OOM. > 2. For streaming jobs running after batch jobs, there is no performance or > stability regression. > 2. 32M and 64M is better (over 10%) in terms of performance for the test > batch job on HDD. > > Based on the above results, I think 32M is a good default choice, because > the performance is good enough for the test job and compared to 64M, more > direct memory can be used by netty and other components. What do you think? > > BTW, about the configuration key, do we reach a consensus? I am > temporarily using taskmanager.memory.network.batch-shuffle-read.size in > my PR now. Any suggestions about that? > > Best, > Yingjie (Kevin) > > -- > 发件人:Guowei Ma > 日 期:2021年03月09日 17:28:35 > 收件人:曹英杰(北牧) > 抄 送:Till Rohrmann; Stephan Ewen; > dev; user; Xintong Song< > tonysong...@gmail.com> > 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM > merge shuffle > > Hi, all > > Thanks all for your suggestions and feedback. > I think it is a good idea that we increase the default size of the > separated pool by testing. I am fine with adding the suffix(".size") to the > config name, which makes it more clear to the user. > But I am a little worried about adding a prefix("framework") because > currently the tm shuffle service is only a shuffle-plugin, which is not a > part of the framework. So maybe we could add a clear explanation in the > document? > > Best, > Guowei > > > On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) wrote: > >> Thanks for the suggestions. I will do some tests and share the results >> after the implementation is ready. Then we can give a proper default value. >> >> Best, >> Yingjie >> >> -- >> 发件人:Till Rohrmann >> 日 期:2021年03月05日 23:03:10 >> 收件人:Stephan Ewen >> 抄 送:dev; user; Xintong Song< >> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma< >> guowei@gmail.com> >> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge >> shuffle >> >> Thanks for this proposal Guowei. +1 for it. >> >> Concerning the default size, maybe we can run some experiments and see >> how the system behaves with different pool sizes. >> >> Cheers, >> Till >> >> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen wrote: >> >>> Thanks Guowei, for the proposal. >>> >>> As discussed offline already, I think this sounds good. >>> >>> One thought is that 16m sounds very small for a default read buffer >>> pool. How risky do you think it is to increase this to 32m or 64m? >>> >>> Best, >>> Stephan >>> >>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma wrote: >>> Hi, all In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem. Goals 1. Don't affect the streaming and pipelined-shuffle-only batch setups. 2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished. 3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.) Proposal 1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m. 2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time. 3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled. In
[jira] [Created] (FLINK-21910) Wrong Checks whether the service has been started.
pengkangjing created FLINK-21910: Summary: Wrong Checks whether the service has been started. Key: FLINK-21910 URL: https://issues.apache.org/jira/browse/FLINK-21910 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: pengkangjing -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Split PyFlink packages into two packages: apache-flink and apache-flink-libraries
When we **pip install** a wheel package, it just unpacks the wheel package and installs its dependencies[1]. There is no way to download things from an external website during installation. It works differently from the source package where we could download something in the setup.py. This is explained in detail in [2]. So I'm afraid that splitting the package is the only solution we have if we want to reduce the package size of pyflink. [1] https://www.python.org/dev/peps/pep-0427/ [2] https://realpython.com/python-wheels/#advantages-of-python-wheels Best, Xingbo Till Rohrmann 于2021年3月19日周五 下午6:32写道: > I think that we should try to reduce the size of the packages by either > splitting them or by having another means to retrieve the Java binaries. > > Cheers, > Till > > On Fri, Mar 19, 2021 at 2:58 AM Xingbo Huang wrote: > > > Hi Till, > > > > The package size of tensorflow[1] is also very big(about 300MB+). > However, > > it does not try to solve the problem, but expands the space limit in PyPI > > frequently whenever the project space is full. We could also choose this > > option. According to our current release frequency, we probably need to > > apply for 15GB expansion every year. There are not too many similar > cases, > > so there is also no standard solution to refer to. But the behavior of > > splitting a project into multiple packages is quite common. For example, > > apache airflow will prepare a corresponding release package for each > > provider[2]. > > > > So I think there are currently two solutions in my mind which could work. > > > > 1. Just keep the current solution and expand the space limit in PyPI > > whenever the space is full. > > > > 2. Split into two packages to reduce the wheel package size. > > > > [1] https://pypi.org/project/tensorflow/#files > > [2] https://pypi.org/search/?q=apache-airflow-*= > > > > Best, > > Xingbo > > > > Till Rohrmann 于2021年3月17日周三 下午9:22写道: > > > > > How do other projects solve this problem? > > > > > > Cheers, > > > Till > > > > > > On Wed, Mar 17, 2021 at 3:45 AM Xingbo Huang > wrote: > > > > > > > Hi Chesnay, > > > > > > > > Yes, in most cases, we can indeed download the required jars in > > > `setup.py`, > > > > which is also the solution I originally thought of reducing the size > of > > > > wheel packages. However, I'm afraid that it will not work in > scenarios > > > when > > > > accessing the external network is not possible which is very common > in > > > the > > > > production cluster. > > > > > > > > Best, > > > > Xingbo > > > > > > > > Chesnay Schepler 于2021年3月16日周二 下午8:32写道: > > > > > > > > > This proposed apache-flink-libraries package would just contain the > > > > > binary, right? And effectively be unusable to the python audience > on > > > > > it's own. > > > > > > > > > > Essentially we are just abusing Pypi for shipping a java binary. Is > > > > > there no way for us to download the jars when the python package is > > > > > being installed? (e.g., in setup.py) > > > > > > > > > > On 3/16/2021 1:23 PM, Dian Fu wrote: > > > > > > Yes, the size of .whl file in PyFlink will also be about 3MB if > we > > > > split > > > > > the package. Currently the package is big because we bundled the > jar > > > > files > > > > > in it. > > > > > > > > > > > >> 2021年3月16日 下午8:13,Chesnay Schepler 写道: > > > > > >> > > > > > >> key difference being that the beam .whl files are 3mb large, aka > > 60x > > > > > smaller. > > > > > >> > > > > > >> On 3/16/2021 1:06 PM, Dian Fu wrote: > > > > > >>> Hi Chesnay, > > > > > >>> > > > > > >>> We will publish binary packages separately for: > > > > > >>> 1) Python 3.5 / 3.6 / 3.7 / 3.8 (since 1.12) separately > > > > > >>> 2) Linux / Mac separately > > > > > >>> > > > > > >>> Besides, there is also a source package which is used when none > > of > > > > the > > > > > above binary packages is usable, e.g. for Window users. > > > > > >>> > > > > > >>> PS: publishing multiple binary packages is very common in > Python > > > > > world, e.g. Beam published 22 packages in 2.28, Pandas published 16 > > > > > packages in 1.2.3 [2]. We could also publishing more packages if we > > > > > splitting the packages as the cost of adding another package will > be > > > very > > > > > small. > > > > > >>> > > > > > >>> Regards, > > > > > >>> Dian > > > > > >>> > > > > > >>> [1] https://pypi.org/project/apache-beam/#files < > > > > > https://pypi.org/project/apache-beam/#files> < > > > > > https://pypi.org/project/apache-beam/#files < > > > > > https://pypi.org/project/apache-beam/#files>> > > > > > >>> [2] https://pypi.org/project/pandas/#files > > > > > >>> > > > > > >>> > > > > > >>> Hi Xintong, > > > > > >>> > > > > > >>> Yes, you are right that there is 9 packages in 1.12 as we added > > > > Python > > > > > 3.8 support in 1.12. > > > > > >>> > > > > > >>> Regards, > > > > > >>> Dian > > > > > >>> > > > > > 2021年3月16日 下午7:45,Xintong Song 写道: > > > > > > > > > > And it's not only uploaded to PyPI, but