[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283159#comment-16283159 ] ASF GitHub Bot commented on FLINK-8220: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5134 @zentol putting this code in external repository would make those benchmarks very fragile, since they depend on Flink internals. Almost any refactor made in Flink's network stack would brake the external repository (of which most committers are not even aware of) and would require coordinated commits, which would be unmaintainable. @greghogan hmmm, that's a very tempting option, however I'm not the one that must be convinced that such option is legally viable. > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5134 @zentol putting this code in external repository would make those benchmarks very fragile, since they depend on Flink internals. Almost any refactor made in Flink's network stack would brake the external repository (of which most committers are not even aware of) and would require coordinated commits, which would be unmaintainable. @greghogan hmmm, that's a very tempting option, however I'm not the one that must be convinced that such option is legally viable. ---
[GitHub] flink issue #5112: [FLINK-8175] remove flink-streaming-contrib and migrate i...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5112 Hi @tillrohrmann , per our discussion, I sent out an email to dev@flink, but seems it didn't catch any attention. What do you think about this PR? ---
[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283113#comment-16283113 ] ASF GitHub Bot commented on FLINK-8175: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5112 Hi @tillrohrmann , per our discussion, I sent out an email to dev@flink, but seems it didn't catch any attention. What do you think about this PR? > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5138: [FLINK-8192] Properly annotate APIs of flink-conne...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5138 [FLINK-8192] Properly annotate APIs of flink-connector-kinesis ## What is the purpose of the change Properly annotate classes in flink-connector-kinesis ## Brief change log Properly annotate classes in flink-connector-kinesis ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8192 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5138.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5138 commit f1e9f8ac1323eaae2aa2a028ff1d03d7a2fbf658 Author: Bowen LiDate: 2017-12-07T08:02:16Z annotate classes commit e919aae934a0e9d7b65bb877acb64231e3b17120 Author: Bowen Li Date: 2017-12-08T06:12:23Z modify annotation commit f5d68ddba2ea0fb65bff9b300c5347cd8f7c35f5 Author: Bowen Li Date: 2017-12-08T06:16:34Z modify annotations ---
[jira] [Commented] (FLINK-8192) Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving / @Internal
[ https://issues.apache.org/jira/browse/FLINK-8192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283108#comment-16283108 ] ASF GitHub Bot commented on FLINK-8192: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5138 [FLINK-8192] Properly annotate APIs of flink-connector-kinesis ## What is the purpose of the change Properly annotate classes in flink-connector-kinesis ## Brief change log Properly annotate classes in flink-connector-kinesis ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8192 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5138.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5138 commit f1e9f8ac1323eaae2aa2a028ff1d03d7a2fbf658 Author: Bowen LiDate: 2017-12-07T08:02:16Z annotate classes commit e919aae934a0e9d7b65bb877acb64231e3b17120 Author: Bowen Li Date: 2017-12-08T06:12:23Z modify annotation commit f5d68ddba2ea0fb65bff9b300c5347cd8f7c35f5 Author: Bowen Li Date: 2017-12-08T06:16:34Z modify annotations > Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving > / @Internal > - > > Key: FLINK-8192 > URL: https://issues.apache.org/jira/browse/FLINK-8192 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.5.0 > > > Currently, the APIs of the Flink connectors have absolutely no annotations on > whether their usage is {{Public}} / {{PublicEvolving}} / or {{Internal}}. > We have, for example, instances in the past where a user was mistakenly using > an abstract internal base class in the Elasticsearch connector. > This JIRA tracks the coverage of API usage annotation for all Flink shipped > connectors. Ideally, a separate subtask should be created for each individual > connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4991#discussion_r155708446 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -61,16 +80,26 @@ * @param heapMemoryInMB The size of the heap memory, in megabytes. * @param directMemoryInMB The size of the direct memory, in megabytes. * @param nativeMemoryInMB The size of the native memory, in megabytes. +* @param memoryForInputInMB The size of the memory for input, in megabytes. +* @param memoryForOutputInMB The size of the memory for output, in megabytes. */ public ResourceProfile( double cpuCores, int heapMemoryInMB, int directMemoryInMB, - int nativeMemoryInMB) { + int nativeMemoryInMB, + int memoryForInputInMB, + int memoryForOutputInMB, --- End diff -- I think resource spec contains the resource user need to run their code, while resource profile contains the resource for running a task. So resource profile should also contain the part of resource used by flink system. We divide these part of resource to memoryForInputInMB and memoryForOutputInMB, and separate them from heap memory and direct memory so as to different resource managers can choose different strategies. For example, per job resource manager need all these resource when allocating a task manager. but session manager may not consider the memoryForInputInMB and memoryForOutputInMB when assign a slot, as these part is decide when the session cluster is created. Do you think it make sense? ---
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann , I agree with you that adding a build looks better, I changed it according to your comments. Do you think it works now? ---
[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282937#comment-16282937 ] ASF GitHub Bot commented on FLINK-7878: --- Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann , I agree with you that adding a build looks better, I changed it according to your comments. Do you think it works now? > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5133 [ERROR] src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java:[41,8] (imports) UnusedImports: Unused import: java.net.InetSocketAddress. ---
[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5133 @casidiablo please also describe why or how the removed code came to be unused. I see that `StandaloneHaServices#RESOURCE_MANAGER_RPC_ENDPOINT_NAME` was left unused by `433a345e`. ---
[jira] [Commented] (FLINK-8215) Collections codegen exception when constructing Array or Map via SQL API
[ https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282799#comment-16282799 ] Rong Rong commented on FLINK-8215: -- Agree. as long as the ValidationException is thrown for TableAPI, we could go ahead and make the codegen to support type widening by adding in better type cast. Will go with option #2 then. > Collections codegen exception when constructing Array or Map via SQL API > > > Key: FLINK-8215 > URL: https://issues.apache.org/jira/browse/FLINK-8215 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > TableAPI goes through `LogicalNode.validate()`, which brings up the > collection validation and rejects inconsistent type, this will throw > `ValidationExcpetion` for something like `array(1.0, 2.0f)`. > SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode > validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS > DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception. > Root cause is the CodeGeneration for these collection value constructors does > not cast or resolve leastRestrictive type correctly. I see 2 options: > 1. Strengthen validation to not allow resolving leastRestrictive type on SQL. > 2. Making codegen support leastRestrictive type cast, such as using > `generateCast` instead of direct casting like `(ClassType) element`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5134: [FLINK-8220] Implement set of network benchmarks
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5134#discussion_r155667757 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.types.LongValue; + +/** + * {@link ReceiverThread} that deserialize incoming messages. + */ +public class SerializingLongReceiver extends ReceiverThread { + +private long maxLatency = Long.MIN_VALUE; +private long minLatency = Long.MAX_VALUE; +private long sumLatency; +private long sumLatencySquare; +private int numSamples; + +private final MutableRecordReader reader; + +public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) { +super(expectedRepetitionsOfExpectedRecord); +this.reader = new MutableRecordReader<>( +inputGate, +new String[]{ +EnvironmentInformation.getTemporaryFileDirectory() +}); +} + +protected void readRecords(long lastExpectedRecord) throws Exception { --- End diff -- Add `@Override`? ---
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282687#comment-16282687 ] ASF GitHub Bot commented on FLINK-8220: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5134#discussion_r155667757 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.types.LongValue; + +/** + * {@link ReceiverThread} that deserialize incoming messages. + */ +public class SerializingLongReceiver extends ReceiverThread { + +private long maxLatency = Long.MIN_VALUE; +private long minLatency = Long.MAX_VALUE; +private long sumLatency; +private long sumLatencySquare; +private int numSamples; + +private final MutableRecordReader reader; + +public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) { +super(expectedRepetitionsOfExpectedRecord); +this.reader = new MutableRecordReader<>( +inputGate, +new String[]{ +EnvironmentInformation.getTemporaryFileDirectory() +}); +} + +protected void readRecords(long lastExpectedRecord) throws Exception { --- End diff -- Add `@Override`? > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5072 @yew1eb have you looked at FLINK-6965? ---
[jira] [Commented] (FLINK-8223) Update Hadoop versions
[ https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282572#comment-16282572 ] ASF GitHub Bot commented on FLINK-8223: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5137 [FLINK-8223] [build] Update Hadoop versions ## What is the purpose of the change Update Hadoop minor versions for Flink 1.5 development cycle. ## Brief change log Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See http://hadoop.apache.org/releases.html ## Verifying this change This change is already covered by existing tests, such as Hadoop tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8223_update_hadoop_versions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5137 commit 34b58f6ce8b66ea8f3096a4792ba108b8df4dbcf Author: Greg HoganDate: 2017-12-07T18:29:29Z [FLINK-8223] [build] Update Hadoop versions > Update Hadoop versions > -- > > Key: FLINK-8223 > URL: https://issues.apache.org/jira/browse/FLINK-8223 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > > Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See > http://hadoop.apache.org/releases.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5137: [FLINK-8223] [build] Update Hadoop versions
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5137 [FLINK-8223] [build] Update Hadoop versions ## What is the purpose of the change Update Hadoop minor versions for Flink 1.5 development cycle. ## Brief change log Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See http://hadoop.apache.org/releases.html ## Verifying this change This change is already covered by existing tests, such as Hadoop tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8223_update_hadoop_versions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5137 commit 34b58f6ce8b66ea8f3096a4792ba108b8df4dbcf Author: Greg HoganDate: 2017-12-07T18:29:29Z [FLINK-8223] [build] Update Hadoop versions ---
[jira] [Commented] (FLINK-8222) Update Scala version
[ https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282490#comment-16282490 ] ASF GitHub Bot commented on FLINK-8222: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5136 [FLINK-8222] [build] Update Scala version ## What is the purpose of the change This is an incremental upgrade to the Scala security release 2.11.12. "A privilege escalation vulnerability (CVE-2017-15288) has been identified in the Scala compilation daemon." https://www.scala-lang.org/news/security-update-nov17.html ## Brief change log Updated scala version in both parent `pom.xml` and in flink-quickstart-scala `pom.xml`. ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8222_update_scala_version Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5136 commit 7136e7f2def4c2d5a694729e3aefc1e4f54dfa22 Author: Greg HoganDate: 2017-12-07T18:22:00Z [FLINK-8222] [build] Update Scala version This is an incremental upgrade to the Scala security release 2.11.12. > Update Scala version > > > Key: FLINK-8222 > URL: https://issues.apache.org/jira/browse/FLINK-8222 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Update Scala to version {{2.11.12}}. I don't believe this affects the Flink > distribution but rather anyone who is compiling Flink or a > Flink-quickstart-derived program on a shared system. > "A privilege escalation vulnerability (CVE-2017-15288) has been identified in > the Scala compilation daemon." > https://www.scala-lang.org/news/security-update-nov17.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5136: [FLINK-8222] [build] Update Scala version
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5136 [FLINK-8222] [build] Update Scala version ## What is the purpose of the change This is an incremental upgrade to the Scala security release 2.11.12. "A privilege escalation vulnerability (CVE-2017-15288) has been identified in the Scala compilation daemon." https://www.scala-lang.org/news/security-update-nov17.html ## Brief change log Updated scala version in both parent `pom.xml` and in flink-quickstart-scala `pom.xml`. ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8222_update_scala_version Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5136 commit 7136e7f2def4c2d5a694729e3aefc1e4f54dfa22 Author: Greg HoganDate: 2017-12-07T18:22:00Z [FLINK-8222] [build] Update Scala version This is an incremental upgrade to the Scala security release 2.11.12. ---
[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"
[ https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282444#comment-16282444 ] ASF GitHub Bot commented on FLINK-8080: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155618187 --- Diff: docs/monitoring/metrics.md --- @@ -329,11 +329,11 @@ or by assigning unique names to jobs and operators. Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These reporters will be instantiated on each job and task manager when they are started. -- `metrics.reporters`: The list of named reporters. - `metrics.reporter..`: Generic setting `` for the reporter named ``. - `metrics.reporter..class`: The reporter class to use for the reporter named ``. - `metrics.reporter..interval`: The reporter interval to use for the reporter named ``. - `metrics.reporter..scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named ``. +- `metrics.reporters`: (optional) An include list for reporters to instantiate. By default all configured reporters will be used. --- End diff -- "An include list for reporters" -> "A comma-separated list of reporter names"? Not sure if we need to specify "comma-separated". > Remove need for "metrics.reporters" > --- > > Key: FLINK-8080 > URL: https://issues.apache.org/jira/browse/FLINK-8080 > Project: Flink > Issue Type: Improvement > Components: Configuration, Metrics >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > Currently, in order to use a reporter one must configure something like this: > {code} > metrics.reporters: jmx > metrics.reporter.jmx.class: ... > {code} > It would be neat if users did not have to set {{metrics.reporters}}. We can > accomplish this by a scanning the configuration for configuration keys > starting with {{metrics.reporter.}} and using the next word as a reporter > name. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155625966 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java --- @@ -76,8 +76,27 @@ public void testIsShutdown() { public void testReporterInstantiation() { Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + + MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + assertTrue(metricRegistry.getReporters().size() == 1); + + Assert.assertTrue(TestReporter1.wasOpened); + + metricRegistry.shutdown(); + } + + /** +* Verifies that the reporter name list is correctly used to determine which reporters should be instantiated. +*/ + @Test + public void testReporterInclusion() { + Configuration config = new Configuration(); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); --- End diff -- Use a `TestReporter2` and verify not opened? Do we need both `testReporterInstantiation` and `testReporterInclusion`? ---
[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155620220 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java --- @@ -108,15 +118,36 @@ public static MetricRegistryConfiguration fromConfiguration(Configuration config delim = '.'; } - final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST); + Set includedReporters = reporterListPattern.splitAsStream(configuration.getString(MetricOptions.REPORTERS_LIST, "")) + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedReporters = new TreeSet<>(String::compareTo); + // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}.", reporterName); --- End diff -- Log the reason for excluding the reporter (not in the reporters list)? ---
[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"
[ https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282443#comment-16282443 ] ASF GitHub Bot commented on FLINK-8080: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155623170 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java --- @@ -44,7 +48,13 @@ private static volatile MetricRegistryConfiguration defaultConfiguration; // regex pattern to split the defined reporters - private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*"); + private static final Pattern reporterListPattern = Pattern.compile("\\s*,\\s*"); + + // regex pattern to extract the name from reporter configuration keys, e.g. "rep" from "metrics.reporter.rep.class" + private static final Pattern reporterClassPattern = Pattern.compile( + Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) + + "([\\S&&[^.]]*)\\." + --- End diff -- It would be helpful to document that we are intersecting regex character classes. > Remove need for "metrics.reporters" > --- > > Key: FLINK-8080 > URL: https://issues.apache.org/jira/browse/FLINK-8080 > Project: Flink > Issue Type: Improvement > Components: Configuration, Metrics >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > Currently, in order to use a reporter one must configure something like this: > {code} > metrics.reporters: jmx > metrics.reporter.jmx.class: ... > {code} > It would be neat if users did not have to set {{metrics.reporters}}. We can > accomplish this by a scanning the configuration for configuration keys > starting with {{metrics.reporter.}} and using the next word as a reporter > name. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5135: [hotfix] [doc] Fix typo in TaskManager and Environ...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5135#discussion_r155627163 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -585,7 +585,7 @@ class TaskManager( config.getMaxRegistrationPause().toMilliseconds, TimeUnit.MILLISECONDS)) -// schedule (with our timeout s delay) a check triggers a new registration +// schedule (with our timeout's delay) a check triggers a new registration --- End diff -- "triggers" -> "to trigger"? Or rewrite like "schedule a check to trigger a new registration attempt if not registered by the timeout"? ---
[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4
[ https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282466#comment-16282466 ] ASF GitHub Bot commented on FLINK-7984: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5072 @yew1eb have you looked at FLINK-6965? > Bump snappy-java to 1.1.4 > - > > Key: FLINK-7984 > URL: https://issues.apache.org/jira/browse/FLINK-7984 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older > version has some issues like memory leak > (https://github.com/xerial/snappy-java/issues/91). > Snappy Java [Release > Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"
[ https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282445#comment-16282445 ] ASF GitHub Bot commented on FLINK-8080: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155620220 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java --- @@ -108,15 +118,36 @@ public static MetricRegistryConfiguration fromConfiguration(Configuration config delim = '.'; } - final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST); + Set includedReporters = reporterListPattern.splitAsStream(configuration.getString(MetricOptions.REPORTERS_LIST, "")) + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedReporters = new TreeSet<>(String::compareTo); + // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}.", reporterName); --- End diff -- Log the reason for excluding the reporter (not in the reporters list)? > Remove need for "metrics.reporters" > --- > > Key: FLINK-8080 > URL: https://issues.apache.org/jira/browse/FLINK-8080 > Project: Flink > Issue Type: Improvement > Components: Configuration, Metrics >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > Currently, in order to use a reporter one must configure something like this: > {code} > metrics.reporters: jmx > metrics.reporter.jmx.class: ... > {code} > It would be neat if users did not have to set {{metrics.reporters}}. We can > accomplish this by a scanning the configuration for configuration keys > starting with {{metrics.reporter.}} and using the next word as a reporter > name. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"
[ https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282446#comment-16282446 ] ASF GitHub Bot commented on FLINK-8080: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155625966 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java --- @@ -76,8 +76,27 @@ public void testIsShutdown() { public void testReporterInstantiation() { Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + + MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + assertTrue(metricRegistry.getReporters().size() == 1); + + Assert.assertTrue(TestReporter1.wasOpened); + + metricRegistry.shutdown(); + } + + /** +* Verifies that the reporter name list is correctly used to determine which reporters should be instantiated. +*/ + @Test + public void testReporterInclusion() { + Configuration config = new Configuration(); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); --- End diff -- Use a `TestReporter2` and verify not opened? Do we need both `testReporterInstantiation` and `testReporterInclusion`? > Remove need for "metrics.reporters" > --- > > Key: FLINK-8080 > URL: https://issues.apache.org/jira/browse/FLINK-8080 > Project: Flink > Issue Type: Improvement > Components: Configuration, Metrics >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > Currently, in order to use a reporter one must configure something like this: > {code} > metrics.reporters: jmx > metrics.reporter.jmx.class: ... > {code} > It would be neat if users did not have to set {{metrics.reporters}}. We can > accomplish this by a scanning the configuration for configuration keys > starting with {{metrics.reporter.}} and using the next word as a reporter > name. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155618187 --- Diff: docs/monitoring/metrics.md --- @@ -329,11 +329,11 @@ or by assigning unique names to jobs and operators. Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These reporters will be instantiated on each job and task manager when they are started. -- `metrics.reporters`: The list of named reporters. - `metrics.reporter..`: Generic setting `` for the reporter named ``. - `metrics.reporter..class`: The reporter class to use for the reporter named ``. - `metrics.reporter..interval`: The reporter interval to use for the reporter named ``. - `metrics.reporter..scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named ``. +- `metrics.reporters`: (optional) An include list for reporters to instantiate. By default all configured reporters will be used. --- End diff -- "An include list for reporters" -> "A comma-separated list of reporter names"? Not sure if we need to specify "comma-separated". ---
[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5099#discussion_r155623170 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java --- @@ -44,7 +48,13 @@ private static volatile MetricRegistryConfiguration defaultConfiguration; // regex pattern to split the defined reporters - private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*"); + private static final Pattern reporterListPattern = Pattern.compile("\\s*,\\s*"); + + // regex pattern to extract the name from reporter configuration keys, e.g. "rep" from "metrics.reporter.rep.class" + private static final Pattern reporterClassPattern = Pattern.compile( + Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) + + "([\\S&&[^.]]*)\\." + --- End diff -- It would be helpful to document that we are intersecting regex character classes. ---
[GitHub] flink pull request #5135: [hotfix] [doc] Fix typo in TaskManager and Environ...
GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5135 [hotfix] [doc] Fix typo in TaskManager and EnvironmentInformation doc You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink hotfix/comments-typo-tm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5135 commit 9f59d9bd9dea0f95aed788c1b2a5a20a995ca07c Author: CristianDate: 2017-12-07T19:00:00Z [hotfix] [doc] Fix typo in TaskManager and EnvironmentInformation doc ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155604499 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; --- End diff -- The variable name is confusing. `multiTaskSlotFuture` is not of type `Future`. ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282293#comment-16282293 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155604499 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; --- End diff -- The variable name is confusing. `multiTaskSlotFuture` is not of type `Future`. > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282300#comment-16282300 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155605251 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java --- @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * A logical slot represents a resource on a TaskManager into + * which a single task can be deployed. + */ +public interface LogicalSlot { + +Payload TERMINATED_PAYLOAD = new Payload() { + + private final CompletableFuture COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null); --- End diff -- nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not actually a constant (not static). > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155605251 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java --- @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * A logical slot represents a resource on a TaskManager into + * which a single task can be deployed. + */ +public interface LogicalSlot { + +Payload TERMINATED_PAYLOAD = new Payload() { + + private final CompletableFuture COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null); --- End diff -- nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not actually a constant (not static). ---
[jira] [Created] (FLINK-8222) Update Scala version
Greg Hogan created FLINK-8222: - Summary: Update Scala version Key: FLINK-8222 URL: https://issues.apache.org/jira/browse/FLINK-8222 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.4.0 Reporter: Greg Hogan Assignee: Greg Hogan Update Scala to version {{2.11.12}}. I don't believe this affects the Flink distribution but rather anyone who is compiling Flink or a Flink-quickstart-derived program on a shared system. "A privilege escalation vulnerability (CVE-2017-15288) has been identified in the Scala compilation daemon." https://www.scala-lang.org/news/security-update-nov17.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8223) Update Hadoop versions
Greg Hogan created FLINK-8223: - Summary: Update Hadoop versions Key: FLINK-8223 URL: https://issues.apache.org/jira/browse/FLINK-8223 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.5.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See http://hadoop.apache.org/releases.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155590317 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( --- End diff -- nit: variable name should be *leaf* https://www.dict.cc/?s=leaf ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282216#comment-16282216 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155590317 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( --- End diff -- nit: variable name should be *leaf* https://www.dict.cc/?s=leaf > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282181#comment-16282181 ] ASF GitHub Bot commented on FLINK-8220: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5134 Or if `flink-benchmarks` does not need to be distributed then that code could be contributed to the main Flink repository. > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5047: Code refine of WordWithCount
Github user harborl commented on the issue: https://github.com/apache/flink/pull/5047 Yes, after a few learning, it turns out that, in this project, POJO is a sort of reasonable style of parameters. Sorry for interruption, I think I am exited about learning more further. ---
[jira] [Commented] (FLINK-7129) Dynamically changing patterns
[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282185#comment-16282185 ] Fabian Hueske commented on FLINK-7129: -- It won't go into Flink 1.4.0 because feature freeze was more than 4 weeks ago and the release is currently voted on. I don't know about plans for 1.5.0. > Dynamically changing patterns > - > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5134 Or if `flink-benchmarks` does not need to be distributed then that code could be contributed to the main Flink repository. ---
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282080#comment-16282080 ] ASF GitHub Bot commented on FLINK-8220: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5134 Why do you want to add these to flink when they are only used by an external repository? Wouldn't it make sense to consolidate all benchmarking code to the flink-benchmark repo? > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5134 Why do you want to add these to flink when they are only used by an external repository? Wouldn't it make sense to consolidate all benchmarking code to the flink-benchmark repo? ---
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281998#comment-16281998 ] ASF GitHub Bot commented on FLINK-8220: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5134 [FLINK-8220] Implement set of network benchmarks This PR implements sets of network benchmarks that are intended to simulate various workloads for network stack. Benchmarks will be executed by `flink-benchmarks` project. We want to keep the code of the benchmarks here, since components they are using are not part of public api and we don't wont them braking unknowingly on changes in the flink code. ## Verifying this change This change adds two tests ensuring that benchmarks are compiling and executing as expected without an exceptions. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink network-benchmarks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5134.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5134 commit 5151d74980fd5995c718173ea03db5049ed0ada2 Author: Piotr NowojskiDate: 2017-11-28T15:49:37Z [FLINK-8172][network] Write to memorySegment directly in RecordSerializer This increases throughput of network stack by factor of 2, because previously method getMemorySegment() was called twice per record and it is a synchronized method on recycleLock, while RecordSerializer is sole owner of the Buffer at this point, so synchronisation is not needed. commit 42a6a0daefdb9e9b1185a8195c28eccf055a83ce Author: Piotr Nowojski Date: 2017-11-28T07:41:40Z [hotfix][test] Add timeout for joining with CheckedThread commit b777e4f44344f12d0196c77515aac0815e7f31c5 Author: Piotr Nowojski Date: 2017-11-28T07:42:19Z [hotfix][util] Added suppressExceptions for lambda functions commit 28e25a2bd0af704af1fa1b084be6b0cfa05d9928 Author: Piotr Nowojski Date: 2017-12-07T09:03:32Z [FLINK-XXX][network-benchmarks] Define network benchmarks in Flink project commit 4f88c6522e176bc20ae53c67136e53dc7970c9e0 Author: Nico Kruber Date: 2017-12-07T09:03:49Z [FLINK-XXX][network-benchmarks] Define latency network benchmarks in Flink project > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5134: [FLINK-8220] Implement set of network benchmarks
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5134 [FLINK-8220] Implement set of network benchmarks This PR implements sets of network benchmarks that are intended to simulate various workloads for network stack. Benchmarks will be executed by `flink-benchmarks` project. We want to keep the code of the benchmarks here, since components they are using are not part of public api and we don't wont them braking unknowingly on changes in the flink code. ## Verifying this change This change adds two tests ensuring that benchmarks are compiling and executing as expected without an exceptions. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink network-benchmarks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5134.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5134 commit 5151d74980fd5995c718173ea03db5049ed0ada2 Author: Piotr NowojskiDate: 2017-11-28T15:49:37Z [FLINK-8172][network] Write to memorySegment directly in RecordSerializer This increases throughput of network stack by factor of 2, because previously method getMemorySegment() was called twice per record and it is a synchronized method on recycleLock, while RecordSerializer is sole owner of the Buffer at this point, so synchronisation is not needed. commit 42a6a0daefdb9e9b1185a8195c28eccf055a83ce Author: Piotr Nowojski Date: 2017-11-28T07:41:40Z [hotfix][test] Add timeout for joining with CheckedThread commit b777e4f44344f12d0196c77515aac0815e7f31c5 Author: Piotr Nowojski Date: 2017-11-28T07:42:19Z [hotfix][util] Added suppressExceptions for lambda functions commit 28e25a2bd0af704af1fa1b084be6b0cfa05d9928 Author: Piotr Nowojski Date: 2017-12-07T09:03:32Z [FLINK-XXX][network-benchmarks] Define network benchmarks in Flink project commit 4f88c6522e176bc20ae53c67136e53dc7970c9e0 Author: Nico Kruber Date: 2017-12-07T09:03:49Z [FLINK-XXX][network-benchmarks] Define latency network benchmarks in Flink project ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281986#comment-16281986 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155549755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java --- @@ -32,6 +34,20 @@ */ public interface LogicalSlot { +Payload TERMINATED_PAYLOAD = new Payload() { + + private final CompletableFuture COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null); --- End diff -- nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not actually a constant (not static). > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155549755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java --- @@ -32,6 +34,20 @@ */ public interface LogicalSlot { +Payload TERMINATED_PAYLOAD = new Payload() { + + private final CompletableFuture COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null); --- End diff -- nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not actually a constant (not static). ---
[jira] [Created] (FLINK-8221) Implement set of network latency benchmarks in Flink
Piotr Nowojski created FLINK-8221: - Summary: Implement set of network latency benchmarks in Flink Key: FLINK-8221 URL: https://issues.apache.org/jira/browse/FLINK-8221 Project: Flink Issue Type: New Feature Components: Network Reporter: Piotr Nowojski Assignee: Nico Kruber -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-8220: -- Summary: Implement set of network throughput benchmarks in Flink (was: Define set of network throughput benchmarks in Flink) > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8220) Define set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-8220: -- Description: Benchmarks should be defined and implemented in flink project and they will be executed in {{flink-benchmarks}} project. Configurable parameters: number of record writers and number of channels. > Define set of network throughput benchmarks in Flink > > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8220) Define set of network throughput benchmarks in Flink
Piotr Nowojski created FLINK-8220: - Summary: Define set of network throughput benchmarks in Flink Key: FLINK-8220 URL: https://issues.apache.org/jira/browse/FLINK-8220 Project: Flink Issue Type: New Feature Components: Network Reporter: Piotr Nowojski Assignee: Piotr Nowojski -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8078) Decouple Execution from actual slot implementation
[ https://issues.apache.org/jira/browse/FLINK-8078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281918#comment-16281918 ] ASF GitHub Bot commented on FLINK-8078: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5086 LGTM > Decouple Execution from actual slot implementation > -- > > Key: FLINK-8078 > URL: https://issues.apache.org/jira/browse/FLINK-8078 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to plug in a different slot implementation, we should introduce a > slot interface which abstracts away the implementation details of > {{SimpleSlot}} wrt {{Execution}}. The reason this is necessary is to provide > a simpler slot implementation for Flip-6 since all allocation/release logic > will go through the {{SlotPool}}. Thus, we no longer need the concurrent > structure of {{Slot}}, {{SharedSlot}}, {{SimpleSlot}} and > {{SlotSharingGroupAssignment}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5086: [FLINK-8078] Introduce LogicalSlot interface
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5086 LGTM ---
[jira] [Created] (FLINK-8219) Kinesis Connector metrics
Gary Oslon created FLINK-8219: - Summary: Kinesis Connector metrics Key: FLINK-8219 URL: https://issues.apache.org/jira/browse/FLINK-8219 Project: Flink Issue Type: Bug Components: Kinesis Connector Reporter: Gary Oslon I don't see in the documentation which metrics are emitted by Kinesis Connector. When working with Amazon's Kinesis Client library, it is common to get metrics via {{MillisBehindLatest}} which tells you whether your processor is delayed. Where are those metrics being published? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method
Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/5133 LGTM. ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281885#comment-16281885 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155520946 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released --- End diff -- nit: All fields are commented with non-javadoc comments. Normally comments on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on fields are displayed by IntelliJ (`Ctrl + J`). > Add support for scheduling with slot sharing > > > Key:
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281883#comment-16281883 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155519870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281884#comment-16281884 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155528224 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155519870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155528224 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155520946 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released --- End diff -- nit: All fields are commented with non-javadoc comments. Normally comments on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on fields are displayed by IntelliJ (`Ctrl + J`). ---
[jira] [Commented] (FLINK-8124) EventTimeTrigger (and other triggers) could have less specific generic types
[ https://issues.apache.org/jira/browse/FLINK-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281876#comment-16281876 ] ASF GitHub Bot commented on FLINK-8124: --- Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5073 > I thought this was mainly about making the time triggers more relaxed so that they accept a Window in addition to only TimeWindow. That's exactly what this PR is about. > EventTimeTrigger (and other triggers) could have less specific generic types > > > Key: FLINK-8124 > URL: https://issues.apache.org/jira/browse/FLINK-8124 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Cristian >Priority: Minor > > When implementing custom WindowAssigners, it is possible to need different > implementations of the {{Window}} class (other than {{TimeWindow}}). In such > cases, it is not possible to use the existing triggers (e.g. > {{EventTimeTrigger}}) because it extends from {{Trigger
[GitHub] flink issue #5073: [FLINK-8124] Make Trigger implementations more generic
Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5073 > I thought this was mainly about making the time triggers more relaxed so that they accept a Window in addition to only TimeWindow. That's exactly what this PR is about. ---
[GitHub] flink pull request #5133: [hotfix] Fix typo in AkkaUtils method
GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5133 [hotfix] Fix typo in AkkaUtils method Also, removed unused code: - `StandaloneHaServices#RESOURCE_MANAGER_RPC_ENDPOINT_NAME` - AkkaRpcServiceUtils#createInetSocketAddressFromAkkaURL() You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink hotfix/fix-typo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5133 ---
[jira] [Commented] (FLINK-8122) Name all table sinks and sources
[ https://issues.apache.org/jira/browse/FLINK-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281851#comment-16281851 ] ASF GitHub Bot commented on FLINK-8122: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5068 Thanks for the review @fhueske. * I added the `explainSink()` method to make the `TableSink` consistent with the `TableSource`. Discarding this method and using the `TableConnectorUtil.genRuntimeName()` in each table sink should also be OK. * Yes, the test sources and sinks are not exposed. I'll revert them. * The source name was set via this call `execEnv.createInput(orcIF).name(explainSource())`. The `createInput()` method for a **mocked** `execEnv` will return `null` and cause a NPE. Thanks, Xingcan > Name all table sinks and sources > > > Key: FLINK-8122 > URL: https://issues.apache.org/jira/browse/FLINK-8122 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Not all table sink and sources have proper names. Therefore, they are > displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should > add names for all built-in connectors. Having information about the table > sink name (via {{INSERT INTO}}) would be even better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5068: [FLINK-8122] [table] Name all built-in table sinks and so...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5068 Thanks for the review @fhueske. * I added the `explainSink()` method to make the `TableSink` consistent with the `TableSource`. Discarding this method and using the `TableConnectorUtil.genRuntimeName()` in each table sink should also be OK. * Yes, the test sources and sinks are not exposed. I'll revert them. * The source name was set via this call `execEnv.createInput(orcIF).name(explainSource())`. The `createInput()` method for a **mocked** `execEnv` will return `null` and cause a NPE. Thanks, Xingcan ---
[jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"
[ https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281834#comment-16281834 ] dongtingting commented on FLINK-8093: - [~aljoscha] I think clientid is thread safe and static. But one taskmanager may have multi slots, different slots use different environment and kafkaProducer classes. So one taskmanager may have multi same clientid, but metrics will register sun.jmx.mbeanserver which is one in one jvm . Then multi same clientid conflict while register into one sun.jmx.mbeanserver. We fix this problem by user code set: properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-" + topic + timestamp); This can avoid conflict。 In addition we want to modify flink code to avoid conflict further。 > flink job fail because of kafka producer create fail of > "javax.management.InstanceAlreadyExistsException" > - > > Key: FLINK-8093 > URL: https://issues.apache.org/jira/browse/FLINK-8093 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 > Environment: flink 1.3.2, kafka 0.9.1 >Reporter: dongtingting >Priority: Critical > > one taskmanager has multiple taskslot, one task fail because of create > kafkaProducer fail,the reason for create kafkaProducer fail is > “javax.management.InstanceAlreadyExistsException: > kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace > is : > 2017-11-04 19:41:23,281 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from > RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.KafkaException: Error registering mbean > kafka.producer:type=producer-metrics,client-id=producer-3 > at > org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159) > at > org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288) > at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255) > at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.(RecordAccumulator.java:111) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:261) > ... 9 more > Caused by: javax.management.InstanceAlreadyExistsException: > kafka.producer:type=producer-metrics,client-id=producer-3 > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157) > ... 16 more > I doubt that task in different taskslot of one taskmanager use different > classloader, and taskid may be the same in one process。 So this lead to > create kafkaProducer fail in one
[GitHub] flink pull request #5052: [FLINK-8133][REST][docs] Generate REST API documen...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5052 ---
[jira] [Commented] (FLINK-8133) Generate documentation for new REST API
[ https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281796#comment-16281796 ] ASF GitHub Bot commented on FLINK-8133: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5052 > Generate documentation for new REST API > --- > > Key: FLINK-8133 > URL: https://issues.apache.org/jira/browse/FLINK-8133 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8133) Generate documentation for new REST API
[ https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8133. --- Resolution: Fixed master: 610fde722fc91ff760cff3865564a51a4b945f19 > Generate documentation for new REST API > --- > > Key: FLINK-8133 > URL: https://issues.apache.org/jira/browse/FLINK-8133 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281780#comment-16281780 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155507607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; --- End diff -- nit: wrong import order (not sorted lexicographically) ``` import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; ``` items should appear before `LogicalSlot` > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155507607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; --- End diff -- nit: wrong import order (not sorted lexicographically) ``` import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; ``` items should appear before `LogicalSlot` ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155507294 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281778#comment-16281778 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155507294 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the
[jira] [Commented] (FLINK-7684) Avoid multiple data copies in MergingWindowSet
[ https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281775#comment-16281775 ] ASF GitHub Bot commented on FLINK-7684: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4723 I think this needs a lot more description of what is happening. For example, this introduces a prominent new configuration setting on the `ExecutionConfig` that is nowhere described, making it hard to review this. The javadocs on the `OptimizationTarget` are minimal, and no other docs have been added. As a general thought: I am very skeptical about adding such new setting, I would actually like us to go the opposite way and reduce the number of nobs further and further over time. Unless there are vast differences in the behavior, I find that an opinionated good implementation or choice of technique is better for users than offering a knob. > Avoid multiple data copies in MergingWindowSet > -- > > Key: FLINK-7684 > URL: https://issues.apache.org/jira/browse/FLINK-7684 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Currently MergingWindowSet uses ListState of tuples to persists it's mapping. > This is inefficient because this ListState of tuples must be converted to a > HashMap on each access. > Furthermore, for some cases it might be inefficient to check whether mapping > has changed before saving it on state. > Those two issues are causing multiple data copies and constructing multiple > Lists/Maps per each processed element, which is a reason for noticeable > performance issues. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4723: [FLINK-7684] Avoid data copies in MergingWindowSet
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4723 I think this needs a lot more description of what is happening. For example, this introduces a prominent new configuration setting on the `ExecutionConfig` that is nowhere described, making it hard to review this. The javadocs on the `OptimizationTarget` are minimal, and no other docs have been added. As a general thought: I am very skeptical about adding such new setting, I would actually like us to go the opposite way and reduce the number of nobs further and further over time. Unless there are vast differences in the behavior, I find that an opinionated good implementation or choice of technique is better for users than offering a knob. ---
[jira] [Commented] (FLINK-7812) Log system resources as metrics
[ https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281768#comment-16281768 ] ASF GitHub Bot commented on FLINK-7812: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4801#discussion_r155503740 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java --- @@ -102,37 +103,16 @@ public static void instantiateStatusMetrics( private static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { - metrics.gauge("TotalMemorySegments", new Gauge () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); - } - }); - metrics. gauge("AvailableMemorySegments", new Gauge () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); - } - }); + final NetworkBufferPool networkBufferPool = network.getNetworkBufferPool(); + metrics. gauge("TotalMemorySegments", () -> (long) networkBufferPool.getTotalNumberOfMemorySegments()); --- End diff -- Replace with "Integer" Gauge and change to method reference? > Log system resources as metrics > --- > > Key: FLINK-7812 > URL: https://issues.apache.org/jira/browse/FLINK-7812 > Project: Flink > Issue Type: New Feature > Components: Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7812) Log system resources as metrics
[ https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281767#comment-16281767 ] ASF GitHub Bot commented on FLINK-7812: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4801#discussion_r155503385 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/SystemResourcesCounter.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.utils; + +import org.apache.flink.api.common.time.Time; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.CentralProcessor.TickType; +import oshi.hardware.HardwareAbstractionLayer; +import oshi.hardware.NetworkIF; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Daemon thread logging system resources. + * + * To accurately and consistently report CPU and network usage we have to periodically probe + * CPU ticks and network sent/received bytes and then convert those values to CPU usage and + * send/receive byte rates. + */ +@ThreadSafe --- End diff -- A Thread is ThreadSafe? ;-) > Log system resources as metrics > --- > > Key: FLINK-7812 > URL: https://issues.apache.org/jira/browse/FLINK-7812 > Project: Flink > Issue Type: New Feature > Components: Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7812) Log system resources as metrics
[ https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281766#comment-16281766 ] ASF GitHub Bot commented on FLINK-7812: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4801#discussion_r155501129 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerITCase.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.apache.flink.configuration.TaskManagerOptions.ADDITIONAL_LOGGING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization in the {@link TaskManagerRunner}. + */ +public class TaskManagerRunnerITCase { --- End diff -- This test seems very specific to this logging, but is named as a generic TaskManagerRunner test. Give it a differnet name? Separate question: Does it have to be an IT case that fully starts the TM, or can it be a unit test that checks config propagation? > Log system resources as metrics > --- > > Key: FLINK-7812 > URL: https://issues.apache.org/jira/browse/FLINK-7812 > Project: Flink > Issue Type: New Feature > Components: Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4801: [FLINK-7812] Log system resources metrics
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4801#discussion_r155503740 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java --- @@ -102,37 +103,16 @@ public static void instantiateStatusMetrics( private static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { - metrics.gauge("TotalMemorySegments", new Gauge () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); - } - }); - metrics. gauge("AvailableMemorySegments", new Gauge () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); - } - }); + final NetworkBufferPool networkBufferPool = network.getNetworkBufferPool(); + metrics. gauge("TotalMemorySegments", () -> (long) networkBufferPool.getTotalNumberOfMemorySegments()); --- End diff -- Replace with "Integer" Gauge and change to method reference? ---
[GitHub] flink pull request #4801: [FLINK-7812] Log system resources metrics
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4801#discussion_r155503385 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/SystemResourcesCounter.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.utils; + +import org.apache.flink.api.common.time.Time; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.CentralProcessor.TickType; +import oshi.hardware.HardwareAbstractionLayer; +import oshi.hardware.NetworkIF; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Daemon thread logging system resources. + * + * To accurately and consistently report CPU and network usage we have to periodically probe + * CPU ticks and network sent/received bytes and then convert those values to CPU usage and + * send/receive byte rates. + */ +@ThreadSafe --- End diff -- A Thread is ThreadSafe? ;-) ---
[GitHub] flink pull request #4801: [FLINK-7812] Log system resources metrics
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4801#discussion_r155501129 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerITCase.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.apache.flink.configuration.TaskManagerOptions.ADDITIONAL_LOGGING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization in the {@link TaskManagerRunner}. + */ +public class TaskManagerRunnerITCase { --- End diff -- This test seems very specific to this logging, but is named as a generic TaskManagerRunner test. Give it a differnet name? Separate question: Does it have to be an IT case that fully starts the TM, or can it be a unit test that checks config propagation? ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281765#comment-16281765 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155503994 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class SchedulerTest extends TestLogger { + + @Test + public void testAddAndRemoveInstance() { + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance i1 = getRandomInstance(2); + Instance i2 = getRandomInstance(2); + Instance i3 = getRandomInstance(2); + + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i3); + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + + // cannot add available instance again + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance twice"); + } + catch (IllegalArgumentException e) { + // bueno! + } + + // some instances die + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + + // try to add a dead instance + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead instance"); + } + catch (IllegalArgumentException e) { + // stimmt + + } + + scheduler.instanceDied(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i3); + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + + assertFalse(i1.isAlive()); + assertFalse(i2.isAlive()); +
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155503866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class SchedulerTest extends TestLogger { + + @Test + public void testAddAndRemoveInstance() { + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance i1 = getRandomInstance(2); + Instance i2 = getRandomInstance(2); + Instance i3 = getRandomInstance(2); + + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i3); + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + + // cannot add available instance again + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance twice"); + } + catch (IllegalArgumentException e) { + // bueno! + } + + // some instances die + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + + // try to add a dead instance + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead instance"); + } + catch (IllegalArgumentException e) { + // stimmt --- End diff -- ð ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281764#comment-16281764 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155503866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class SchedulerTest extends TestLogger { + + @Test + public void testAddAndRemoveInstance() { + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance i1 = getRandomInstance(2); + Instance i2 = getRandomInstance(2); + Instance i3 = getRandomInstance(2); + + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i3); + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + + // cannot add available instance again + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance twice"); + } + catch (IllegalArgumentException e) { + // bueno! + } + + // some instances die + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + + // try to add a dead instance + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead instance"); + } + catch (IllegalArgumentException e) { + // stimmt --- End diff -- > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155503994 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class SchedulerTest extends TestLogger { + + @Test + public void testAddAndRemoveInstance() { + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance i1 = getRandomInstance(2); + Instance i2 = getRandomInstance(2); + Instance i3 = getRandomInstance(2); + + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i3); + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + + // cannot add available instance again + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance twice"); + } + catch (IllegalArgumentException e) { + // bueno! + } + + // some instances die + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + + // try to add a dead instance + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead instance"); + } + catch (IllegalArgumentException e) { + // stimmt + + } + + scheduler.instanceDied(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i3); + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + + assertFalse(i1.isAlive()); + assertFalse(i2.isAlive()); + assertFalse(i3.isAlive()); + } + catch (Exception e) { --- End diff -- Better propagate the exception but I guess this file was copy pasted. ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281759#comment-16281759 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155502971 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Simple {@link AllocatedSlotActions} implementations for testing purposes. + */ +public class TestingAllocatedSlotActions implements AllocatedSlotActions { + + private volatile Consumer> releaseSlotConsumer; + + public void setReleaseSlotConsumer(Consumer > releaseSlotConsumer) { + this.releaseSlotConsumer = Preconditions.checkNotNull(releaseSlotConsumer); + } + + @Override + public CompletableFuture releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause) { + Consumer > currentReleaseSlotConsumer = this.releaseSlotConsumer; + + if (currentReleaseSlotConsumer != null) { + currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, cause)); --- End diff -- nit: whitespace after `cause` ``` ... cause )); ``` > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155502971 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Simple {@link AllocatedSlotActions} implementations for testing purposes. + */ +public class TestingAllocatedSlotActions implements AllocatedSlotActions { + + private volatile Consumer> releaseSlotConsumer; + + public void setReleaseSlotConsumer(Consumer > releaseSlotConsumer) { + this.releaseSlotConsumer = Preconditions.checkNotNull(releaseSlotConsumer); + } + + @Override + public CompletableFuture releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause) { + Consumer > currentReleaseSlotConsumer = this.releaseSlotConsumer; + + if (currentReleaseSlotConsumer != null) { + currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, cause)); --- End diff -- nit: whitespace after `cause` ``` ... cause )); ``` ---
[jira] [Commented] (FLINK-8133) Generate documentation for new REST API
[ https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281754#comment-16281754 ] ASF GitHub Bot commented on FLINK-8133: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5052 merging. > Generate documentation for new REST API > --- > > Key: FLINK-8133 > URL: https://issues.apache.org/jira/browse/FLINK-8133 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5052: [FLINK-8133][REST][docs] Generate REST API documentation
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5052 merging. ---
[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources
[ https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281752#comment-16281752 ] ASF GitHub Bot commented on FLINK-8174: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5114 It depends a little bit on 1.4. We have serious build stability issues in the `release-1.3` branch and it would take a serious effort to stabilize it again. If I could choose, then we wouldn't have to release 1.3.3 because users switch directly to 1.4. > Mesos RM unable to accept offers for unreserved resources > - > > Key: FLINK-8174 > URL: https://issues.apache.org/jira/browse/FLINK-8174 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0, 1.3.3 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.4.0, 1.5.0 > > > Flink has suffered a regression due to FLINK-7294. Any attempt to accept a > resource offer that is based on unreserved resources will fail, because Flink > (as of FLINK-7294) erroneously insists that the resource come from a prior > reservation. > Looking at the original issue, the problem may have been misdiagnosed. > Ideally Flink should work with both reserved and unreserved resources, but > the latter is a more common situation that is now broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8133) Generate documentation for new REST API
[ https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281753#comment-16281753 ] ASF GitHub Bot commented on FLINK-8133: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5052#discussion_r155501188 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.docs.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.ConfigurationException; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jsonSchema.JsonSchema; +import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Generator for the Rest API documentation. + * + * This class can be either invoked directly --- End diff -- leftover comment, will remove it. > Generate documentation for new REST API > --- > > Key: FLINK-8133 > URL: https://issues.apache.org/jira/browse/FLINK-8133 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5114: [FLINK-8174] [mesos] Mesos RM unable to accept offers for...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5114 It depends a little bit on 1.4. We have serious build stability issues in the `release-1.3` branch and it would take a serious effort to stabilize it again. If I could choose, then we wouldn't have to release 1.3.3 because users switch directly to 1.4. ---
[GitHub] flink pull request #5052: [FLINK-8133][REST][docs] Generate REST API documen...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5052#discussion_r155501188 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.docs.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.ConfigurationException; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jsonSchema.JsonSchema; +import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Generator for the Rest API documentation. + * + * This class can be either invoked directly --- End diff -- leftover comment, will remove it. ---
[jira] [Commented] (FLINK-7561) Add support for pre-aggregation in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281748#comment-16281748 ] ASF GitHub Bot commented on FLINK-7561: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4626 I think it would be nice to have a utility here in order to make this easier to use: ```java DataStream result = Utils.withPreaggregation( stream.timeWindow(Time.minutes(5)), myAggregateFunction ) .apply(windowFunction); ``` The utility would basically take the aggregate function and insert the stream transformation for the pre-aggregation on the "*predecessor* or the keyed stream, and then set up the `WindowedStream` again. Pseudo code: ```java public staticWindowedStream preaggregate( WindowedStream windowedStream, AggregateFunction preAggregator) { // sanity check that the windowedStream has no custom trigger and evictor PreAggregationOperator preAggOp = new PreAggregationOperator(preAggregator, properties from windowed stream); DataStream originalStream = 'get predecessor before keyBy from windowed stream' DataStream preAggregated = originalStream.transform(preAggOp , ...); WindowedStream windowedAgain = preAggregated .keyBy(key extractor from original windowed stream) .window(assigner); return windowedAgain; } ``` > Add support for pre-aggregation in DataStream API > - > > Key: FLINK-7561 > URL: https://issues.apache.org/jira/browse/FLINK-7561 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4626: [FLINK-7561][streaming] Implement PreAggregationOperator
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4626 I think it would be nice to have a utility here in order to make this easier to use: ```java DataStream result = Utils.withPreaggregation( stream.timeWindow(Time.minutes(5)), myAggregateFunction ) .apply(windowFunction); ``` The utility would basically take the aggregate function and insert the stream transformation for the pre-aggregation on the "*predecessor* or the keyed stream, and then set up the `WindowedStream` again. Pseudo code: ```java public staticWindowedStream preaggregate( WindowedStream windowedStream, AggregateFunction preAggregator) { // sanity check that the windowedStream has no custom trigger and evictor PreAggregationOperator preAggOp = new PreAggregationOperator(preAggregator, properties from windowed stream); DataStream originalStream = 'get predecessor before keyBy from windowed stream' DataStream preAggregated = originalStream.transform(preAggOp , ...); WindowedStream windowedAgain = preAggregated .keyBy(key extractor from original windowed stream) .window(assigner); return windowedAgain; } ``` ---
[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots
[ https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281732#comment-16281732 ] ASF GitHub Bot commented on FLINK-8089: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155496482 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot +* requests. +* +* See FLINK-8089 +*/ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request --- End diff -- nit: same here > Fulfil slot requests with unused offered slots > -- > > Key: FLINK-8089 > URL: https://issues.apache.org/jira/browse/FLINK-8089 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{SlotPool}} adds unused offered slots to the list of available slots > without checking whether another pending slot request could be fulfilled with > this slot. This should be changed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155496482 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot +* requests. +* +* See FLINK-8089 +*/ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request --- End diff -- nit: same here ---
[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible
[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281731#comment-16281731 ] ASF GitHub Bot commented on FLINK-8203: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5132 [FLINK-8203] [FLINK-7681] [table] Make schema definition of DataStream/DataSet to Table conversion more flexible ## What is the purpose of the change This PR makes the schema definition more flexible. It add two ways of adding schema information: Reference input fields by name: All fields in the schema definition are referenced by name (and possibly renamed using an alias (as). In this mode, fields can be reordered and projected out. Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). This mode can be used for any input type, including POJOs. Reference input fields by position: Field references must refer to existing fields in the input type (except for renaming with alias (as)). In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and no of fields references a field of the input type. It also allows any TypeInformation. In the past, this behavior was not consistent. I will add some paragraphs to the documentation, once we agreed on this new behavior. ## Brief change log Various changes in `TableEnvironment`, `Stream/BatchTableEnvironment`, and pattern matches that referenced `AtomicType` instead of `TypeInformation`. ## Verifying this change See TableEnvironment tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? will document it later You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8203 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5132 commit 38562e1dcc5416996ad5531b901f89e4b868e5eb Author: twalthrDate: 2017-12-07T10:52:28Z [FLINK-8203] [FLINK-7681] [table] Make schema definition of DataStream/DataSet to Table conversion more flexible > Make schema definition of DataStream/DataSet to Table conversion more flexible > -- > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can