[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905086#comment-15905086
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3487


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904826#comment-15904826
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3487
  
I think this looks good now, merging this...


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902558#comment-15902558
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3487
  
Thank you, Stephan and zentol.
Good question and suggestion. I didn't consider it too much for batch jobs.
Also I think it's a bad idea that naming the variable "numberOfKeyGroups" 
in TaskInfo. Keeping max-parallelism is better. It's more common and makes much 
more sense for other scenarios.
I will make sure it can work with batch jobs, and update this PR later.


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901225#comment-15901225
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3487
  
For batch jobs, I would suggest that we make sure that the 
`ExecutionEnvironment` or the `JobGraphGenerator` sets the max parallelism to 
the parallelism. Then we do not need to differentiate in the RuntimeContext.


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901101#comment-15901101
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3487#discussion_r104896041
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -97,10 +97,16 @@ public int getNumberOfParallelSubtasks() {
}
 
@Override
+   public int getMaxNumberOfParallelSubtasks() {
+   // Number of KeyGroups is same with max-parallelism currently.
+   return taskInfo.getNumberOfKeyGroups();
--- End diff --

what does this return for the batch API, which doesn't have key-groups? 
(ideally it should ```getNumberOFParallelSubtasks()```.


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900536#comment-15900536
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3487#discussion_r104825967
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -85,6 +85,13 @@
int getIndexOfThisSubtask();
 
/**
+* Gets the number of max-parallelism with which the parallel task runs.
+*
+* @return The max-parallelism with which the parallel task runs.
+*/
+   int getMaxParallelismOfSubtasks();
--- End diff --

That's a better name.


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899647#comment-15899647
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3487#discussion_r104703506
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -85,6 +85,13 @@
int getIndexOfThisSubtask();
 
/**
+* Gets the number of max-parallelism with which the parallel task runs.
+*
+* @return The max-parallelism with which the parallel task runs.
+*/
+   int getMaxParallelismOfSubtasks();
--- End diff --

should this maybe be called ```getMaxNumberOfParallelSubtasks``` to be 
consistent with other methods?


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899650#comment-15899650
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3487#discussion_r104703639
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -100,7 +100,12 @@ public int getNumberOfParallelSubtasks() {
public int getIndexOfThisSubtask() {
return taskInfo.getIndexOfThisSubtask();
}
-   
+
+   @Override
+   public int getMaxParallelismOfSubtasks() {
+   // Number of KeyGroups is same with max-parallelism currently.
+   return taskInfo.getNumberOfKeyGroups();
+   }
--- End diff --

misssing empty line


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899648#comment-15899648
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3487#discussion_r104703690
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 ---
@@ -125,6 +125,10 @@ public int getIndexOfThisSubtask() {
}
 
@Override
+   public int getMaxParallelismOfSubtasks() {
+   return runtimeContext.getMaxParallelismOfSubtasks();
+   }
--- End diff --

missing empty line


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899353#comment-15899353
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

GitHub user ifndef-SleePy opened a pull request:

https://github.com/apache/flink/pull/3487

[FLINK-5980] Expose max-parallelism value in RuntimeContext.

Add new method named getMaxParallelismOfSubtasks in RuntimeContext.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5980

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3487


commit b7844d43d053d447a905fa727916683e06ec583d
Author: biao.liub 
Date:   2017-03-07T12:11:10Z

[FLINK-5980] Expose max-parallelism value in RuntimeContext.




> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)