[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108977967
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -221,10 +232,19 @@ public int getMaxParallelism() {
 * @param maxParallelism Maximum parallelism for this stream 
transformation.
 */
public void setMaxParallelism(int maxParallelism) {
-   Preconditions.checkArgument(maxParallelism > 0
-   && maxParallelism <= 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
-   "Maximum parallelism must be between 1 and " + 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
-   + ". Found: " + maxParallelism);
+   checkArgument(
+   parallelism != 
ExecutionConfig.PARALLELISM_DEFAULT,
+   "A maximum parallelism can only be specified 
with an explicitly specified " +
+   "parallelism.");
--- End diff --

Yes, it is but the problem is that the treatment of the "default 
parallelism" is a bit strange. (The default parallelism is the parallelism that 
is set from the flink config or by the user on the command line using the `-p` 
parameter. Maybe we have to rework that before we can fix this.

I'm starting to think that we should maybe revert the 
parallelism/max-parallelism changes on the release-1.2 branch and rework the 
whole thing properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108977470
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,7 +203,17 @@ public int getParallelism() {
 * @param parallelism The new parallelism to set on this {@code 
StreamTransformation}
 */
public void setParallelism(int parallelism) {
-   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
be bigger than zero.");
+   checkArgument(parallelism != 
ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
--- End diff --

It seems that in fact it was added here by mistake: 
https://github.com/apache/flink/commit/ec975aa


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108956486
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,7 +203,17 @@ public int getParallelism() {
 * @param parallelism The new parallelism to set on this {@code 
StreamTransformation}
 */
public void setParallelism(int parallelism) {
-   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
be bigger than zero.");
+   checkArgument(parallelism != 
ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
--- End diff --

@greghogan I see. so I suppose we could remove it. This will simplify the 
checks here a bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108948674
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,7 +203,17 @@ public int getParallelism() {
 * @param parallelism The new parallelism to set on this {@code 
StreamTransformation}
 */
public void setParallelism(int parallelism) {
-   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
be bigger than zero.");
+   checkArgument(parallelism != 
ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
--- End diff --

`PARALLELISM_UNKNOWN` was removed in FLINK-3980. Not sure why it was added 
back unless this was unintentional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108943809
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -221,10 +232,19 @@ public int getMaxParallelism() {
 * @param maxParallelism Maximum parallelism for this stream 
transformation.
 */
public void setMaxParallelism(int maxParallelism) {
-   Preconditions.checkArgument(maxParallelism > 0
-   && maxParallelism <= 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
-   "Maximum parallelism must be between 1 and " + 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
-   + ". Found: " + maxParallelism);
+   checkArgument(
+   parallelism != 
ExecutionConfig.PARALLELISM_DEFAULT,
+   "A maximum parallelism can only be specified 
with an explicitly specified " +
+   "parallelism.");
+   checkArgument(maxParallelism > 0, "The maximum parallelism must 
be greater than 0.");
+   checkArgument(
+   maxParallelism >= parallelism,
+   "The maximum parallelism must be larger than 
the parallelism. (parallelism = " +
+   parallelism + " max-parallelism 
= " + maxParallelism);
--- End diff --

Missing closing ")" at the end of the error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108943516
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,7 +203,17 @@ public int getParallelism() {
 * @param parallelism The new parallelism to set on this {@code 
StreamTransformation}
 */
public void setParallelism(int parallelism) {
-   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
be bigger than zero.");
+   checkArgument(parallelism != 
ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
--- End diff --

The `PARALLELISM_UNKNOWN` does not seem to be used anywhere in the 
codebase, apart from checking against it. Couldn't we remove it? Or am I 
missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108944527
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -221,10 +232,19 @@ public int getMaxParallelism() {
 * @param maxParallelism Maximum parallelism for this stream 
transformation.
 */
public void setMaxParallelism(int maxParallelism) {
-   Preconditions.checkArgument(maxParallelism > 0
-   && maxParallelism <= 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
-   "Maximum parallelism must be between 1 and " + 
StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
-   + ". Found: " + maxParallelism);
+   checkArgument(
+   parallelism != 
ExecutionConfig.PARALLELISM_DEFAULT,
+   "A maximum parallelism can only be specified 
with an explicitly specified " +
+   "parallelism.");
--- End diff --

This means that we always have to specify a `parallelism` before being able 
to specify a `maxParallelism`. This seems a bit counter-intuitive to me, as the 
only constraint seems to be that `parallelism <= maxParalleliem`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3616#discussion_r108943194
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 ---
@@ -50,6 +52,83 @@
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
 
--- End diff --

I would suggest to add some tests (the same code as the existing ones) with 
invalid configurations and the expected exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT ...

2017-03-26 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3616

[FLINK-6188] Correctly handle PARALLELISM_DEFAULT in stream operator

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-6188-set-parallelism

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3616


commit 40ca699cc1f6b00d6d50b16d6833794c5362dbb1
Author: Aljoscha Krettek 
Date:   2017-03-26T10:23:01Z

[FLINK-6188] Handle PARALLELISM_DEFAULT in setParallelism()

SingleOutputStreamOperator.setParallelism() and
StreamTransform.setParallelism() did not correctly handle the case of
setting the parallelism to PARALLELISM_DEFAULT.

commit 9107a3400af877ddfefcc0f5b16cc6b03127ec76
Author: Aljoscha Krettek 
Date:   2017-03-26T10:24:58Z

[FLINK-6188] Add TimestampAssignerTranslationTest

The tests verify that we instantiate the correct operator and that they
correctly pick up the parallelism form the upstream operator.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---