[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108977967 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -221,10 +232,19 @@ public int getMaxParallelism() { * @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0 - && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, - "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM - + ". Found: " + maxParallelism); + checkArgument( + parallelism != ExecutionConfig.PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); --- End diff -- Yes, it is but the problem is that the treatment of the "default parallelism" is a bit strange. (The default parallelism is the parallelism that is set from the flink config or by the user on the command line using the `-p` parameter. Maybe we have to rework that before we can fix this. I'm starting to think that we should maybe revert the parallelism/max-parallelism changes on the release-1.2 branch and rework the whole thing properly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108977470 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,7 +203,17 @@ public int getParallelism() { * @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { - Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); --- End diff -- It seems that in fact it was added here by mistake: https://github.com/apache/flink/commit/ec975aa --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108956486 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,7 +203,17 @@ public int getParallelism() { * @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { - Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); --- End diff -- @greghogan I see. so I suppose we could remove it. This will simplify the checks here a bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108948674 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,7 +203,17 @@ public int getParallelism() { * @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { - Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); --- End diff -- `PARALLELISM_UNKNOWN` was removed in FLINK-3980. Not sure why it was added back unless this was unintentional. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108943809 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -221,10 +232,19 @@ public int getMaxParallelism() { * @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0 - && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, - "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM - + ". Found: " + maxParallelism); + checkArgument( + parallelism != ExecutionConfig.PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); + checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0."); + checkArgument( + maxParallelism >= parallelism, + "The maximum parallelism must be larger than the parallelism. (parallelism = " + + parallelism + " max-parallelism = " + maxParallelism); --- End diff -- Missing closing ")" at the end of the error message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108943516 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,7 +203,17 @@ public int getParallelism() { * @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { - Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); --- End diff -- The `PARALLELISM_UNKNOWN` does not seem to be used anywhere in the codebase, apart from checking against it. Couldn't we remove it? Or am I missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108944527 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -221,10 +232,19 @@ public int getMaxParallelism() { * @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { - Preconditions.checkArgument(maxParallelism > 0 - && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, - "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM - + ". Found: " + maxParallelism); + checkArgument( + parallelism != ExecutionConfig.PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); --- End diff -- This means that we always have to specify a `parallelism` before being able to specify a `maxParallelism`. This seems a bit counter-intuitive to me, as the only constraint seems to be that `parallelism <= maxParalleliem`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108943194 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java --- @@ -50,6 +52,83 @@ @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { --- End diff -- I would suggest to add some tests (the same code as the existing ones) with invalid configurations and the expected exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3616 [FLINK-6188] Correctly handle PARALLELISM_DEFAULT in stream operator Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6188-set-parallelism Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3616 commit 40ca699cc1f6b00d6d50b16d6833794c5362dbb1 Author: Aljoscha KrettekDate: 2017-03-26T10:23:01Z [FLINK-6188] Handle PARALLELISM_DEFAULT in setParallelism() SingleOutputStreamOperator.setParallelism() and StreamTransform.setParallelism() did not correctly handle the case of setting the parallelism to PARALLELISM_DEFAULT. commit 9107a3400af877ddfefcc0f5b16cc6b03127ec76 Author: Aljoscha Krettek Date: 2017-03-26T10:24:58Z [FLINK-6188] Add TimestampAssignerTranslationTest The tests verify that we instantiate the correct operator and that they correctly pick up the parallelism form the upstream operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---