Populate Error message from running client's task and persist it into JobContext for better error reporting.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9508a1ac Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9508a1ac Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9508a1ac Branch: refs/heads/helix-0.6.x Commit: 9508a1acfae1d915148138daccb2abd5f9dce430 Parents: c3624e0 Author: Lei Xia <l...@linkedin.com> Authored: Thu May 5 11:25:22 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Tue Jul 5 16:18:34 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobContext.java | 51 ++++++-- .../org/apache/helix/task/JobRebalancer.java | 15 ++- .../java/org/apache/helix/task/TaskRunner.java | 5 +- .../org/apache/helix/task/TaskStateModel.java | 4 +- .../apache/helix/integration/task/MockTask.java | 23 +++- .../task/TestTaskErrorReporting.java | 117 +++++++++++++++++++ 6 files changed, 194 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/JobContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java index 2057f27..328fcc0 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java @@ -45,7 +45,8 @@ public class JobContext extends HelixProperty { TARGET, TASK_ID, ASSIGNED_PARTICIPANT, - NEXT_RETRY_TIME + NEXT_RETRY_TIME, + INFO } public JobContext(ZNRecord record) { @@ -76,8 +77,18 @@ public class JobContext extends HelixProperty { return Long.parseLong(tStr); } + public void setInfo(String info) { + if (info != null) { + _record.setSimpleField(ContextProperties.INFO.toString(), info); + } + } + + public String getInfo() { + return _record.getSimpleField(ContextProperties.INFO.toString()); + } + public void setPartitionState(int p, TaskPartitionState s) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.STATE.toString(), s.name()); } @@ -95,7 +106,7 @@ public class JobContext extends HelixProperty { } public void setPartitionNumAttempts(int p, int n) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n)); } @@ -122,7 +133,7 @@ public class JobContext extends HelixProperty { } public void setPartitionStartTime(int p, long t) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.START_TIME.toString(), String.valueOf(t)); } @@ -139,7 +150,7 @@ public class JobContext extends HelixProperty { } public void setPartitionFinishTime(int p, long t) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t)); } @@ -156,7 +167,7 @@ public class JobContext extends HelixProperty { } public void setPartitionTarget(int p, String targetPName) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.TARGET.toString(), targetPName); } @@ -165,6 +176,16 @@ public class JobContext extends HelixProperty { return (map != null) ? map.get(ContextProperties.TARGET.toString()) : null; } + public void setPartitionInfo(int p, String info) { + Map<String, String> map = getMapField(p, true); + map.put(ContextProperties.INFO.toString(), info); + } + + public String getPartitionInfo(int p) { + Map<String, String> map = getMapField(p); + return (map != null) ? map.get(ContextProperties.INFO.toString()) : null; + } + public Map<String, List<Integer>> getPartitionsByTarget() { Map<String, List<Integer>> result = Maps.newHashMap(); for (Map.Entry<String, Map<String, String>> mapField : _record.getMapFields().entrySet()) { @@ -194,7 +215,7 @@ public class JobContext extends HelixProperty { } public void setTaskIdForPartition(int p, String taskId) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.TASK_ID.toString(), taskId); } @@ -216,7 +237,7 @@ public class JobContext extends HelixProperty { } public void setAssignedParticipant(int p, String participantName) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName); } @@ -226,7 +247,7 @@ public class JobContext extends HelixProperty { } public void setNextRetryTime(int p, long t) { - Map<String, String> map = getMapField(p); + Map<String, String> map = getMapField(p, true); map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t)); } @@ -242,10 +263,20 @@ public class JobContext extends HelixProperty { return Long.parseLong(tStr); } + /** + * Get MapField for the given partition. + * + * @param p + * @return mapField for the partition, NULL if the partition has not scheduled yet. + */ public Map<String, String> getMapField(int p) { + return getMapField(p, false); + } + + private Map<String, String> getMapField(int p, boolean createIfNotPresent) { String pStr = String.valueOf(p); Map<String, String> map = _record.getMapField(pStr); - if (map == null) { + if (map == null && createIfNotPresent) { map = new TreeMap<String, String>(); _record.setMapField(pStr, map); } http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 0f34178..fae7ac7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -214,8 +214,9 @@ public class JobRebalancer extends TaskRebalancer { if (allPartitions == null || allPartitions.isEmpty()) { // Empty target partitions, mark the job as FAILED. - LOG.warn( - "Missing task partition mapping for job " + jobResource + ", marked the job as FAILED!"); + String failureMsg = "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!"; + LOG.info(failureMsg); + jobCtx.setInfo(failureMsg); markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false); return new ResourceAssignment(jobResource); @@ -266,6 +267,12 @@ public class JobRebalancer extends TaskRebalancer { pName), instance)); jobCtx.setPartitionState(pId, currState); + String taskMsg = currStateOutput.getInfo(jobResource, new Partition( + pName), instance); + if (taskMsg != null) { + jobCtx.setPartitionInfo(pId, taskMsg); + } + // Process any requested state transitions. String requestedStateStr = currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); @@ -318,8 +325,8 @@ public class JobRebalancer extends TaskRebalancer { case ERROR: { donePartitions.add(pId); // The task may be rescheduled on a different instance. LOG.debug(String.format( - "Task partition %s has error state %s. Marking as such in rebalancer context.", pName, - currState)); + "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName, + currState, taskMsg)); markPartitionError(jobCtx, pId, currState, true); // The error policy is to fail the task as soon a single partition fails for a specified // maximum number of attempts or task is in ABORTED state. http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java index c43d0ce..eabaf64 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java @@ -72,7 +72,7 @@ public class TaskRunner implements Runnable { throw death; } catch (Throwable t) { LOG.error("Problem running the task, report task as FAILED.", t); - _result = new TaskResult(Status.FAILED, null); + _result = new TaskResult(Status.FAILED, "Exception happened in running task: " + t.getMessage()); } switch (_result.getStatus()) { @@ -98,6 +98,9 @@ public class TaskRunner implements Runnable { throw new AssertionError("Unknown task result type: " + _result.getStatus().name()); } } catch (Exception e) { + LOG.error("Problem running the task, report task as FAILED.", e); + _result = + new TaskResult(Status.FAILED, "Exception happened in running task: " + e.getMessage()); requestStateTransition(TaskPartitionState.TASK_ERROR); } finally { synchronized (_doneSync) { http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index ba68a78..fd07176 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -87,7 +87,7 @@ public class TaskStateModel extends StateModel { } @Transition(to = "COMPLETED", from = "RUNNING") - public void onBecomeCompletedFromRunning(Message msg, NotificationContext context) { + public String onBecomeCompletedFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { throw new IllegalStateException(String.format( @@ -102,6 +102,8 @@ public class TaskStateModel extends StateModel { } timeout_task.cancel(false); + + return r.getInfo(); } @Transition(to = "TIMED_OUT", from = "RUNNING") http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java index 3fe1d6f..db0c8f4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java @@ -32,12 +32,14 @@ public class MockTask implements Task { public static final String TIMEOUT_CONFIG = "Timeout"; public static final String TASK_RESULT_STATUS = "TaskResultStatus"; public static final String THROW_EXCEPTION = "ThrowException"; + public static final String ERROR_MESSAGE = "ErrorMessage"; public static final String FAILURE_COUNT_BEFORE_SUCCESS = "FailureCountBeforeSuccess"; private final long _delay; private volatile boolean _canceled; private TaskResult.Status _taskResultStatus; private boolean _throwException; private int _expectedToSuccess; + private String _errorMsg; public MockTask(TaskCallbackContext context) { Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap(); @@ -59,8 +61,10 @@ public class MockTask implements Task { Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) : false; _expectedToSuccess = - cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(cfg.get( - FAILURE_COUNT_BEFORE_SUCCESS)) : 0; + cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt( + cfg.get(FAILURE_COUNT_BEFORE_SUCCESS)) : 0; + + _errorMsg = cfg.containsKey(ERROR_MESSAGE) ? cfg.get(ERROR_MESSAGE) : null; } @Override @@ -77,12 +81,21 @@ public class MockTask implements Task { } timeLeft = expiry - System.currentTimeMillis(); - if (_throwException || _expectedToSuccess > 0) { + if (_throwException) { + _expectedToSuccess--; + if (_errorMsg == null) { + _errorMsg = "Test failed"; + } + throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed"); + } + + if (_expectedToSuccess > 0){ _expectedToSuccess--; - throw new RuntimeException("Test failed"); + throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed"); } - return new TaskResult(_taskResultStatus, String.valueOf(timeLeft < 0 ? 0 : timeLeft)); + return new TaskResult(_taskResultStatus, + _errorMsg != null ? _errorMsg : String.valueOf(timeLeft < 0 ? 0 : timeLeft)); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/9508a1ac/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java new file mode 100644 index 0000000..906dcff --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java @@ -0,0 +1,117 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Test Error reporting for failed tasks + */ +public class TestTaskErrorReporting extends TaskTestBase { + + @Test + public void test() throws Exception { + int taskRetryCount = 1; + int num_tasks = 5; + + String jobResource = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = new JobConfig.Builder(); + jobBuilder.setCommand(MockTask.TASK_COMMAND).setTimeoutPerTask(10000) + .setMaxAttemptsPerTask(taskRetryCount).setFailureThreshold(Integer.MAX_VALUE); + + // create each task configs. + final int abortedTask = 1; + final int failedTask = 2; + final int exceptionTask = 3; + + final String abortedMsg = "This task aborted, some terrible things must happened."; + final String failedMsg = "This task failed, something may be wrong."; + final String exceptionMsg = "This task throws exception."; + final String successMsg = "Yes, we did it!"; + + List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>(); + for (int j = 0; j < num_tasks; j++) { + TaskConfig.Builder configBuilder = new TaskConfig.Builder().setTaskId("task_" + j); + switch (j) { + case abortedTask: + configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name()) + .addConfig(MockTask.ERROR_MESSAGE, abortedMsg); + break; + case failedTask: + configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name()) + .addConfig(MockTask.ERROR_MESSAGE, failedMsg); + break; + case exceptionTask: + configBuilder.addConfig(MockTask.THROW_EXCEPTION, Boolean.TRUE.toString()) + .addConfig(MockTask.ERROR_MESSAGE, exceptionMsg); + break; + default: + configBuilder.addConfig(MockTask.ERROR_MESSAGE, successMsg); + break; + } + configBuilder.setTargetPartition(String.valueOf(j)); + taskConfigs.add(configBuilder.build()); + } + jobBuilder.addTaskConfigs(taskConfigs); + + Workflow flow = + WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); + + _driver.start(flow); + + // Wait until the job completes. + TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED); + + JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)); + for (int i = 0; i < num_tasks; i++) { + TaskPartitionState state = ctx.getPartitionState(i); + String taskId = ctx.getTaskIdForPartition(i); + String errMsg = ctx.getPartitionInfo(i); + + if (taskId.equals("task_" + abortedTask)) { + Assert.assertEquals(state, TaskPartitionState.TASK_ABORTED); + Assert.assertEquals(errMsg, abortedMsg); + } else if (taskId.equals("task_" + failedTask)) { + Assert.assertEquals(state, TaskPartitionState.TASK_ERROR); + Assert.assertEquals(errMsg, failedMsg); + } else if (taskId.equals("task_" + exceptionTask)) { + Assert.assertEquals(state, TaskPartitionState.TASK_ERROR); + Assert.assertTrue(errMsg.contains(exceptionMsg)); + } else { + Assert.assertEquals(state, TaskPartitionState.COMPLETED); + Assert.assertEquals(errMsg, successMsg); + + } + } + } +}