Repository: hive
Updated Branches:
  refs/heads/master df51738ad -> 2de64b0b0


HIVE-14078: LLAP input split should get task attempt number from conf if 
available (Jason Dere, reviewed by Prasanth Jayachandran/Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2de64b0b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2de64b0b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2de64b0b

Branch: refs/heads/master
Commit: 2de64b0b0575264aa9716bc6fa824cd076884257
Parents: df51738
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Jun 30 13:14:03 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Jun 30 13:14:03 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapBaseInputFormat.java   | 22 +++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2de64b0b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index aef5762..c2fca54 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -59,6 +59,8 @@ import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -137,15 +139,25 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
 
     LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
         new LlapRecordReaderTaskUmbilicalExternalResponder();
-    // TODO: close this
     LlapTaskUmbilicalExternalClient llapClient =
       new LlapTaskUmbilicalExternalClient(job, 
submitWorkInfo.getTokenIdentifier(),
           submitWorkInfo.getToken(), umbilicalResponder, llapToken);
     llapClient.init(job);
     llapClient.start();
 
+    int attemptNum = 0;
+    // Use task attempt number from conf if provided
+    TaskAttemptID taskAttemptId = 
TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID));
+    if (taskAttemptId != null) {
+      attemptNum = taskAttemptId.getId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting attempt number to " + attemptNum + " from task 
attempt ID in conf: " +
+            job.get(MRJobConfig.TASK_ATTEMPT_ID));
+      }
+    }
+
     SubmitWorkRequestProto request = constructSubmitWorkRequestProto(
-        submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(),
+        submitWorkInfo, llapSplit.getSplitNum(), attemptNum, 
llapClient.getAddress(),
         submitWorkInfo.getToken(), llapSplit.getFragmentBytes(),
         llapSplit.getFragmentBytesSignature());
     llapClient.submitWork(request, host, llapSubmitPort);
@@ -275,7 +287,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   }
 
   private SubmitWorkRequestProto 
constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
-      int taskNum, InetSocketAddress address, Token<JobTokenIdentifier> token,
+      int taskNum, int attemptNum, InetSocketAddress address, 
Token<JobTokenIdentifier> token,
       byte[] fragmentBytes, byte[] fragmentBytesSignature) throws IOException {
     ApplicationId appId = submitWorkInfo.getFakeAppId();
 
@@ -284,7 +296,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     LOG.info("Setting user in submitWorkRequest to: " + user);
 
     ContainerId containerId =
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), 
taskNum);
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 
attemptNum), taskNum);
 
     // Credentials can change across DAGs. Ideally construct only once per DAG.
     Credentials credentials = new Credentials();
@@ -309,7 +321,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     }
     builder.setWorkSpec(vertexBuilder.build());
     builder.setFragmentNumber(taskNum);
-    builder.setAttemptNumber(0); // TODO: hmm
+    builder.setAttemptNumber(attemptNum);
     builder.setContainerIdString(containerId.toString());
     builder.setAmHost(address.getHostName());
     builder.setAmPort(address.getPort());

Reply via email to