Populate Error message from running client's task and persist it into 
JobContext for better error reporting.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9508a1ac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9508a1ac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9508a1ac

Branch: refs/heads/helix-0.6.x
Commit: 9508a1acfae1d915148138daccb2abd5f9dce430
Parents: c3624e0
Author: Lei Xia <l...@linkedin.com>
Authored: Thu May 5 11:25:22 2016 -0700
Committer: Lei Xia <l...@linkedin.com>
Committed: Tue Jul 5 16:18:34 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobContext.java  |  51 ++++++--
 .../org/apache/helix/task/JobRebalancer.java    |  15 ++-
 .../java/org/apache/helix/task/TaskRunner.java  |   5 +-
 .../org/apache/helix/task/TaskStateModel.java   |   4 +-
 .../apache/helix/integration/task/MockTask.java |  23 +++-
 .../task/TestTaskErrorReporting.java            | 117 +++++++++++++++++++
 6 files changed, 194 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java 
b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 2057f27..328fcc0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -45,7 +45,8 @@ public class JobContext extends HelixProperty {
     TARGET,
     TASK_ID,
     ASSIGNED_PARTICIPANT,
-    NEXT_RETRY_TIME
+    NEXT_RETRY_TIME,
+    INFO
   }
 
   public JobContext(ZNRecord record) {
@@ -76,8 +77,18 @@ public class JobContext extends HelixProperty {
     return Long.parseLong(tStr);
   }
 
+  public void setInfo(String info) {
+    if (info != null) {
+      _record.setSimpleField(ContextProperties.INFO.toString(), info);
+    }
+  }
+
+  public String getInfo() {
+    return _record.getSimpleField(ContextProperties.INFO.toString());
+  }
+
   public void setPartitionState(int p, TaskPartitionState s) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.STATE.toString(), s.name());
   }
 
@@ -95,7 +106,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionNumAttempts(int p, int n) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
   }
 
@@ -122,7 +133,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionStartTime(int p, long t) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.START_TIME.toString(), String.valueOf(t));
   }
 
@@ -139,7 +150,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionFinishTime(int p, long t) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
   }
 
@@ -156,7 +167,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionTarget(int p, String targetPName) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.TARGET.toString(), targetPName);
   }
 
@@ -165,6 +176,16 @@ public class JobContext extends HelixProperty {
     return (map != null) ? map.get(ContextProperties.TARGET.toString()) : null;
   }
 
+  public void setPartitionInfo(int p, String info) {
+    Map<String, String> map = getMapField(p, true);
+    map.put(ContextProperties.INFO.toString(), info);
+  }
+
+  public String getPartitionInfo(int p) {
+    Map<String, String> map = getMapField(p);
+    return (map != null) ? map.get(ContextProperties.INFO.toString()) : null;
+  }
+
   public Map<String, List<Integer>> getPartitionsByTarget() {
     Map<String, List<Integer>> result = Maps.newHashMap();
     for (Map.Entry<String, Map<String, String>> mapField : 
_record.getMapFields().entrySet()) {
@@ -194,7 +215,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setTaskIdForPartition(int p, String taskId) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.TASK_ID.toString(), taskId);
   }
 
@@ -216,7 +237,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setAssignedParticipant(int p, String participantName) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), 
participantName);
   }
 
@@ -226,7 +247,7 @@ public class JobContext extends HelixProperty {
   }
 
   public void setNextRetryTime(int p, long t) {
-    Map<String, String> map = getMapField(p);
+    Map<String, String> map = getMapField(p, true);
     map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
   }
 
@@ -242,10 +263,20 @@ public class JobContext extends HelixProperty {
     return Long.parseLong(tStr);
   }
 
+  /**
+   * Get MapField for the given partition.
+   *
+   * @param p
+   * @return mapField for the partition, NULL if the partition has not 
scheduled yet.
+   */
   public Map<String, String> getMapField(int p) {
+    return getMapField(p, false);
+  }
+
+  private Map<String, String> getMapField(int p, boolean createIfNotPresent) {
     String pStr = String.valueOf(p);
     Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
+    if (map == null && createIfNotPresent) {
       map = new TreeMap<String, String>();
       _record.setMapField(pStr, map);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0f34178..fae7ac7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -214,8 +214,9 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
-      LOG.warn(
-          "Missing task partition mapping for job " + jobResource + ", marked 
the job as FAILED!");
+      String failureMsg = "Empty task partition mapping for job " + 
jobResource + ", marked the job as FAILED!";
+      LOG.info(failureMsg);
+      jobCtx.setInfo(failureMsg);
       markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
       markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
       return new ResourceAssignment(jobResource);
@@ -266,6 +267,12 @@ public class JobRebalancer extends TaskRebalancer {
                 pName), instance));
         jobCtx.setPartitionState(pId, currState);
 
+        String taskMsg = currStateOutput.getInfo(jobResource, new Partition(
+            pName), instance);
+        if (taskMsg != null) {
+          jobCtx.setPartitionInfo(pId, taskMsg);
+        }
+
         // Process any requested state transitions.
         String requestedStateStr =
             currStateOutput.getRequestedState(jobResource, new 
Partition(pName), instance);
@@ -318,8 +325,8 @@ public class JobRebalancer extends TaskRebalancer {
         case ERROR: {
           donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
           LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in 
rebalancer context.", pName,
-              currState));
+              "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.", pName,
+              currState, taskMsg));
           markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition 
fails for a specified
           // maximum number of attempts or task is in ABORTED state.

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index c43d0ce..eabaf64 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -72,7 +72,7 @@ public class TaskRunner implements Runnable {
         throw death;
       } catch (Throwable t) {
         LOG.error("Problem running the task, report task as FAILED.", t);
-        _result = new TaskResult(Status.FAILED, null);
+        _result = new TaskResult(Status.FAILED, "Exception happened in running 
task: " + t.getMessage());
       }
 
       switch (_result.getStatus()) {
@@ -98,6 +98,9 @@ public class TaskRunner implements Runnable {
         throw new AssertionError("Unknown task result type: " + 
_result.getStatus().name());
       }
     } catch (Exception e) {
+      LOG.error("Problem running the task, report task as FAILED.", e);
+      _result =
+          new TaskResult(Status.FAILED, "Exception happened in running task: " 
+ e.getMessage());
       requestStateTransition(TaskPartitionState.TASK_ERROR);
     } finally {
       synchronized (_doneSync) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index ba68a78..fd07176 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -87,7 +87,7 @@ public class TaskStateModel extends StateModel {
   }
 
   @Transition(to = "COMPLETED", from = "RUNNING")
-  public void onBecomeCompletedFromRunning(Message msg, NotificationContext 
context) {
+  public String onBecomeCompletedFromRunning(Message msg, NotificationContext 
context) {
     String taskPartition = msg.getPartitionName();
     if (_taskRunner == null) {
       throw new IllegalStateException(String.format(
@@ -102,6 +102,8 @@ public class TaskStateModel extends StateModel {
     }
 
     timeout_task.cancel(false);
+
+    return r.getInfo();
   }
 
   @Transition(to = "TIMED_OUT", from = "RUNNING")

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java 
b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 3fe1d6f..db0c8f4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -32,12 +32,14 @@ public class MockTask implements Task {
   public static final String TIMEOUT_CONFIG = "Timeout";
   public static final String TASK_RESULT_STATUS = "TaskResultStatus";
   public static final String THROW_EXCEPTION = "ThrowException";
+  public static final String ERROR_MESSAGE = "ErrorMessage";
   public static final String FAILURE_COUNT_BEFORE_SUCCESS = 
"FailureCountBeforeSuccess";
   private final long _delay;
   private volatile boolean _canceled;
   private TaskResult.Status _taskResultStatus;
   private boolean _throwException;
   private int _expectedToSuccess;
+  private String _errorMsg;
 
   public MockTask(TaskCallbackContext context) {
     Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
@@ -59,8 +61,10 @@ public class MockTask implements Task {
         Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) :
         false;
     _expectedToSuccess =
-        cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? 
Integer.parseInt(cfg.get(
-            FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
+        cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(
+            cfg.get(FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
+
+    _errorMsg = cfg.containsKey(ERROR_MESSAGE) ? cfg.get(ERROR_MESSAGE) : null;
   }
 
   @Override
@@ -77,12 +81,21 @@ public class MockTask implements Task {
     }
     timeLeft = expiry - System.currentTimeMillis();
 
-    if (_throwException || _expectedToSuccess > 0) {
+    if (_throwException) {
+      _expectedToSuccess--;
+      if (_errorMsg == null) {
+        _errorMsg = "Test failed";
+      }
+      throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test 
failed");
+    }
+
+    if (_expectedToSuccess > 0){
       _expectedToSuccess--;
-      throw new RuntimeException("Test failed");
+      throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test 
failed");
     }
 
-    return new TaskResult(_taskResultStatus, String.valueOf(timeLeft < 0 ? 0 : 
timeLeft));
+    return new TaskResult(_taskResultStatus,
+        _errorMsg != null ? _errorMsg : String.valueOf(timeLeft < 0 ? 0 : 
timeLeft));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
new file mode 100644
index 0000000..906dcff
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
@@ -0,0 +1,117 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test Error reporting for failed tasks
+ */
+public class TestTaskErrorReporting extends TaskTestBase {
+
+  @Test
+  public void test() throws Exception {
+    int taskRetryCount = 1;
+    int num_tasks = 5;
+
+    String jobResource = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder();
+    jobBuilder.setCommand(MockTask.TASK_COMMAND).setTimeoutPerTask(10000)
+        
.setMaxAttemptsPerTask(taskRetryCount).setFailureThreshold(Integer.MAX_VALUE);
+
+    // create each task configs.
+    final int abortedTask = 1;
+    final int failedTask = 2;
+    final int exceptionTask = 3;
+
+    final String abortedMsg = "This task aborted, some terrible things must 
happened.";
+    final String failedMsg = "This task failed, something may be wrong.";
+    final String exceptionMsg = "This task throws exception.";
+    final String successMsg = "Yes, we did it!";
+
+    List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+    for (int j = 0; j < num_tasks; j++) {
+      TaskConfig.Builder configBuilder = new 
TaskConfig.Builder().setTaskId("task_" + j);
+      switch (j) {
+      case abortedTask:
+        configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, 
TaskResult.Status.FATAL_FAILED.name())
+            .addConfig(MockTask.ERROR_MESSAGE, abortedMsg);
+        break;
+      case failedTask:
+        configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, 
TaskResult.Status.FAILED.name())
+            .addConfig(MockTask.ERROR_MESSAGE, failedMsg);
+        break;
+      case exceptionTask:
+        configBuilder.addConfig(MockTask.THROW_EXCEPTION, 
Boolean.TRUE.toString())
+            .addConfig(MockTask.ERROR_MESSAGE, exceptionMsg);
+        break;
+      default:
+        configBuilder.addConfig(MockTask.ERROR_MESSAGE, successMsg);
+        break;
+      }
+      configBuilder.setTargetPartition(String.valueOf(j));
+      taskConfigs.add(configBuilder.build());
+    }
+    jobBuilder.addTaskConfigs(taskConfigs);
+
+    Workflow flow =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, 
jobBuilder).build();
+
+    _driver.start(flow);
+
+    // Wait until the job completes.
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, 
TaskState.COMPLETED);
+
+    JobContext ctx = 
_driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
+    for (int i = 0; i < num_tasks; i++) {
+      TaskPartitionState state = ctx.getPartitionState(i);
+      String taskId = ctx.getTaskIdForPartition(i);
+      String errMsg = ctx.getPartitionInfo(i);
+
+      if (taskId.equals("task_" + abortedTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ABORTED);
+        Assert.assertEquals(errMsg, abortedMsg);
+      } else if (taskId.equals("task_" + failedTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
+        Assert.assertEquals(errMsg, failedMsg);
+      } else if (taskId.equals("task_" + exceptionTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
+        Assert.assertTrue(errMsg.contains(exceptionMsg));
+      } else {
+        Assert.assertEquals(state, TaskPartitionState.COMPLETED);
+        Assert.assertEquals(errMsg, successMsg);
+
+      }
+    }
+  }
+}

Reply via email to