[6/7] flink git commit: [hotfix] Expose AllocationID as string through TaskInfo
[hotfix] Expose AllocationID as string through TaskInfo (cherry picked from commit edece9c) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbe3cbfa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbe3cbfa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbe3cbfa Branch: refs/heads/release-1.5 Commit: fbe3cbfae9d08f8e5f9c21f0f075944b239cb50c Parents: eba462e Author: Stefan Richter Authored: Tue Mar 13 13:53:48 2018 +0100 Committer: Stefan Richter Committed: Thu May 17 10:07:52 2018 +0200 -- .../org/apache/flink/api/common/TaskInfo.java | 36 ++-- .../util/AbstractRuntimeUDFContext.java | 7 .../apache/flink/runtime/taskmanager/Task.java | 3 +- 3 files changed, 43 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java index 33f2e0c..2583687 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java @@ -20,8 +20,8 @@ package org.apache.flink.api.common; import org.apache.flink.annotation.Internal; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number. @@ -31,12 +31,35 @@ public class TaskInfo { private final String taskName; private final String taskNameWithSubtasks; + private final String allocationIDAsString; private final int maxNumberOfParallelSubtasks; private final int indexOfSubtask; private final int numberOfParallelSubtasks; private final int attemptNumber; - public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) { + public TaskInfo( + String taskName, + int maxNumberOfParallelSubtasks, + int indexOfSubtask, + int numberOfParallelSubtasks, + int attemptNumber) { + this( + taskName, + maxNumberOfParallelSubtasks, + indexOfSubtask, + numberOfParallelSubtasks, + attemptNumber, + "UNKNOWN"); + } + + public TaskInfo( + String taskName, + int maxNumberOfParallelSubtasks, + int indexOfSubtask, + int numberOfParallelSubtasks, + int attemptNumber, + String allocationIDAsString) { + checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number."); checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number."); checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism."); @@ -49,6 +72,7 @@ public class TaskInfo { this.numberOfParallelSubtasks = numberOfParallelSubtasks; this.attemptNumber = attemptNumber; this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')'; + this.allocationIDAsString = checkNotNull(allocationIDAsString); } /** @@ -107,4 +131,12 @@ public class TaskInfo { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationIDAsString() { + return allocationIDAsString; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 6246e80..d6262c7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java
[6/7] flink git commit: [hotfix] Expose AllocationID as string through TaskInfo
[hotfix] Expose AllocationID as string through TaskInfo Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edece9c1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edece9c1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edece9c1 Branch: refs/heads/master Commit: edece9c1e2ec8e759b783ea07dcaa50d4e8704a2 Parents: 8993599 Author: Stefan Richter Authored: Tue Mar 13 13:53:48 2018 +0100 Committer: Stefan Richter Committed: Thu May 17 10:03:04 2018 +0200 -- .../org/apache/flink/api/common/TaskInfo.java | 36 ++-- .../util/AbstractRuntimeUDFContext.java | 7 .../apache/flink/runtime/taskmanager/Task.java | 3 +- 3 files changed, 43 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java index 33f2e0c..2583687 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java @@ -20,8 +20,8 @@ package org.apache.flink.api.common; import org.apache.flink.annotation.Internal; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number. @@ -31,12 +31,35 @@ public class TaskInfo { private final String taskName; private final String taskNameWithSubtasks; + private final String allocationIDAsString; private final int maxNumberOfParallelSubtasks; private final int indexOfSubtask; private final int numberOfParallelSubtasks; private final int attemptNumber; - public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) { + public TaskInfo( + String taskName, + int maxNumberOfParallelSubtasks, + int indexOfSubtask, + int numberOfParallelSubtasks, + int attemptNumber) { + this( + taskName, + maxNumberOfParallelSubtasks, + indexOfSubtask, + numberOfParallelSubtasks, + attemptNumber, + "UNKNOWN"); + } + + public TaskInfo( + String taskName, + int maxNumberOfParallelSubtasks, + int indexOfSubtask, + int numberOfParallelSubtasks, + int attemptNumber, + String allocationIDAsString) { + checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number."); checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number."); checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism."); @@ -49,6 +72,7 @@ public class TaskInfo { this.numberOfParallelSubtasks = numberOfParallelSubtasks; this.attemptNumber = attemptNumber; this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')'; + this.allocationIDAsString = checkNotNull(allocationIDAsString); } /** @@ -107,4 +131,12 @@ public class TaskInfo { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationIDAsString() { + return allocationIDAsString; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 6246e80..d6262c7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/uti