[6/7] flink git commit: [hotfix] Expose AllocationID as string through TaskInfo

2018-05-17 Thread srichter
[hotfix] Expose AllocationID as string through TaskInfo

(cherry picked from commit edece9c)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbe3cbfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbe3cbfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbe3cbfa

Branch: refs/heads/release-1.5
Commit: fbe3cbfae9d08f8e5f9c21f0f075944b239cb50c
Parents: eba462e
Author: Stefan Richter 
Authored: Tue Mar 13 13:53:48 2018 +0100
Committer: Stefan Richter 
Committed: Thu May 17 10:07:52 2018 +0200

--
 .../org/apache/flink/api/common/TaskInfo.java   | 36 ++--
 .../util/AbstractRuntimeUDFContext.java |  7 
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 3 files changed, 43 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 33f2e0c..2583687 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Encapsulates task-specific information: name, index of subtask, parallelism 
and attempt number.
@@ -31,12 +31,35 @@ public class TaskInfo {
 
private final String taskName;
private final String taskNameWithSubtasks;
+   private final String allocationIDAsString;
private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
 
-   public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int 
indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+   public TaskInfo(
+   String taskName,
+   int maxNumberOfParallelSubtasks,
+   int indexOfSubtask,
+   int numberOfParallelSubtasks,
+   int attemptNumber) {
+   this(
+   taskName,
+   maxNumberOfParallelSubtasks,
+   indexOfSubtask,
+   numberOfParallelSubtasks,
+   attemptNumber,
+   "UNKNOWN");
+   }
+
+   public TaskInfo(
+   String taskName,
+   int maxNumberOfParallelSubtasks,
+   int indexOfSubtask,
+   int numberOfParallelSubtasks,
+   int attemptNumber,
+   String allocationIDAsString) {
+
checkArgument(indexOfSubtask >= 0, "Task index must be a 
non-negative number.");
checkArgument(maxNumberOfParallelSubtasks >= 1, "Max 
parallelism must be a positive number.");
checkArgument(maxNumberOfParallelSubtasks >= 
numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
@@ -49,6 +72,7 @@ public class TaskInfo {
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 
1) + '/' + numberOfParallelSubtasks + ')';
+   this.allocationIDAsString = checkNotNull(allocationIDAsString);
}
 
/**
@@ -107,4 +131,12 @@ public class TaskInfo {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationIDAsString() {
+   return allocationIDAsString;
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fbe3cbfa/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6246e80..d6262c7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java

[6/7] flink git commit: [hotfix] Expose AllocationID as string through TaskInfo

2018-05-17 Thread srichter
[hotfix] Expose AllocationID as string through TaskInfo


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edece9c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edece9c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edece9c1

Branch: refs/heads/master
Commit: edece9c1e2ec8e759b783ea07dcaa50d4e8704a2
Parents: 8993599
Author: Stefan Richter 
Authored: Tue Mar 13 13:53:48 2018 +0100
Committer: Stefan Richter 
Committed: Thu May 17 10:03:04 2018 +0200

--
 .../org/apache/flink/api/common/TaskInfo.java   | 36 ++--
 .../util/AbstractRuntimeUDFContext.java |  7 
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 3 files changed, 43 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 33f2e0c..2583687 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Encapsulates task-specific information: name, index of subtask, parallelism 
and attempt number.
@@ -31,12 +31,35 @@ public class TaskInfo {
 
private final String taskName;
private final String taskNameWithSubtasks;
+   private final String allocationIDAsString;
private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
 
-   public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int 
indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+   public TaskInfo(
+   String taskName,
+   int maxNumberOfParallelSubtasks,
+   int indexOfSubtask,
+   int numberOfParallelSubtasks,
+   int attemptNumber) {
+   this(
+   taskName,
+   maxNumberOfParallelSubtasks,
+   indexOfSubtask,
+   numberOfParallelSubtasks,
+   attemptNumber,
+   "UNKNOWN");
+   }
+
+   public TaskInfo(
+   String taskName,
+   int maxNumberOfParallelSubtasks,
+   int indexOfSubtask,
+   int numberOfParallelSubtasks,
+   int attemptNumber,
+   String allocationIDAsString) {
+
checkArgument(indexOfSubtask >= 0, "Task index must be a 
non-negative number.");
checkArgument(maxNumberOfParallelSubtasks >= 1, "Max 
parallelism must be a positive number.");
checkArgument(maxNumberOfParallelSubtasks >= 
numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
@@ -49,6 +72,7 @@ public class TaskInfo {
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 
1) + '/' + numberOfParallelSubtasks + ')';
+   this.allocationIDAsString = checkNotNull(allocationIDAsString);
}
 
/**
@@ -107,4 +131,12 @@ public class TaskInfo {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationIDAsString() {
+   return allocationIDAsString;
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6246e80..d6262c7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/uti