[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905086#comment-15905086 ] ASF GitHub Bot commented on FLINK-5980: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3487 > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904826#comment-15904826 ] ASF GitHub Bot commented on FLINK-5980: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3487 I think this looks good now, merging this... > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902558#comment-15902558 ] ASF GitHub Bot commented on FLINK-5980: --- Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3487 Thank you, Stephan and zentol. Good question and suggestion. I didn't consider it too much for batch jobs. Also I think it's a bad idea that naming the variable "numberOfKeyGroups" in TaskInfo. Keeping max-parallelism is better. It's more common and makes much more sense for other scenarios. I will make sure it can work with batch jobs, and update this PR later. > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901225#comment-15901225 ] ASF GitHub Bot commented on FLINK-5980: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3487 For batch jobs, I would suggest that we make sure that the `ExecutionEnvironment` or the `JobGraphGenerator` sets the max parallelism to the parallelism. Then we do not need to differentiate in the RuntimeContext. > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901101#comment-15901101 ] ASF GitHub Bot commented on FLINK-5980: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3487#discussion_r104896041 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java --- @@ -97,10 +97,16 @@ public int getNumberOfParallelSubtasks() { } @Override + public int getMaxNumberOfParallelSubtasks() { + // Number of KeyGroups is same with max-parallelism currently. + return taskInfo.getNumberOfKeyGroups(); --- End diff -- what does this return for the batch API, which doesn't have key-groups? (ideally it should ```getNumberOFParallelSubtasks()```. > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900536#comment-15900536 ] ASF GitHub Bot commented on FLINK-5980: --- Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/3487#discussion_r104825967 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -85,6 +85,13 @@ int getIndexOfThisSubtask(); /** +* Gets the number of max-parallelism with which the parallel task runs. +* +* @return The max-parallelism with which the parallel task runs. +*/ + int getMaxParallelismOfSubtasks(); --- End diff -- That's a better name. > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899647#comment-15899647 ] ASF GitHub Bot commented on FLINK-5980: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3487#discussion_r104703506 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -85,6 +85,13 @@ int getIndexOfThisSubtask(); /** +* Gets the number of max-parallelism with which the parallel task runs. +* +* @return The max-parallelism with which the parallel task runs. +*/ + int getMaxParallelismOfSubtasks(); --- End diff -- should this maybe be called ```getMaxNumberOfParallelSubtasks``` to be consistent with other methods? > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899650#comment-15899650 ] ASF GitHub Bot commented on FLINK-5980: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3487#discussion_r104703639 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java --- @@ -100,7 +100,12 @@ public int getNumberOfParallelSubtasks() { public int getIndexOfThisSubtask() { return taskInfo.getIndexOfThisSubtask(); } - + + @Override + public int getMaxParallelismOfSubtasks() { + // Number of KeyGroups is same with max-parallelism currently. + return taskInfo.getNumberOfKeyGroups(); + } --- End diff -- misssing empty line > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899648#comment-15899648 ] ASF GitHub Bot commented on FLINK-5980: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3487#discussion_r104703690 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java --- @@ -125,6 +125,10 @@ public int getIndexOfThisSubtask() { } @Override + public int getMaxParallelismOfSubtasks() { + return runtimeContext.getMaxParallelismOfSubtasks(); + } --- End diff -- missing empty line > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899353#comment-15899353 ] ASF GitHub Bot commented on FLINK-5980: --- GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/3487 [FLINK-5980] Expose max-parallelism value in RuntimeContext. Add new method named getMaxParallelismOfSubtasks in RuntimeContext. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5980 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3487 commit b7844d43d053d447a905fa727916683e06ec583d Author: biao.liubDate: 2017-03-07T12:11:10Z [FLINK-5980] Expose max-parallelism value in RuntimeContext. > Expose max-parallelism value in RuntimeContext > -- > > Key: FLINK-5980 > URL: https://issues.apache.org/jira/browse/FLINK-5980 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Minor > > I am implementing a custom source function. I want to keep all progresses in > a ListState to support dynamically scaling just like what FlinkKafkaConsumer > did. And I think the max-parallelism value is ideal length for the ListState > in my scenario. But I realize that currently the max-parallelism value is not > visible to UDF. > I propose exposing the max-parallelism value in RuntimeContext. It will be > useful in dynamically scaling scenario. -- This message was sent by Atlassian JIRA (v6.3.15#6346)