>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237 )
Change subject: [NO ISSUE][OTH] Logging improvements for job failure events
......................................................................
[NO ISSUE][OTH] Logging improvements for job failure events
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Change-Id: Ib1c99f00cde31224b0bcb86357c64d9c5404d2e7
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
8 files changed, 64 insertions(+), 86 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/37/18237/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acd..631f226 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@
private void abortTaskCluster(TaskClusterAttempt tcAttempt,
TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
- LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+ LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.trace(() -> "Abort map for job: " + jobId + ": " +
abortTaskAttemptMap);
+ LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
INodeManager nodeManager = ccs.getNodeManager();
abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
final NodeControllerState node =
nodeManager.getNodeControllerState(key);
if (node != null) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Aborting: " + abortTaskAttempts + " at " +
key);
- }
+ LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
try {
node.getNodeController().abortTasks(jobId,
abortTaskAttempts);
} catch (Exception e) {
@@ -579,8 +577,8 @@
inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
TaskCluster tc = tcAttempt.getTaskCluster();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
- pmm.removeUncommittedPartitions(tc.getProducedPartitions(),
abortTaskIds);
- pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+ pmm.removeUncommittedPartitions(tc.getProducedPartitions(),
abortTaskIds, jobId);
+ pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds,
jobId);
tcAttempt.setStatus(failedOrAbortedStatus);
tcAttempt.setEndTime(System.currentTimeMillis());
@@ -683,7 +681,6 @@
*/
public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
- LOGGER.debug("Received failure notification for TaskAttempt " +
ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@
LOGGER.trace(() -> "Marking TaskAttempt " +
ta.getTaskAttemptId()
+ " as failed and the number of max re-attempts = " +
maxReattempts);
if (lastAttempt.getAttempt() >= maxReattempts ||
isCancelled()) {
- LOGGER.debug(() -> "Aborting the job of " +
ta.getTaskAttemptId());
+ LOGGER.debug("Aborting the job:{} of {}",
jobRun.getJobId(), ta.getTaskAttemptId());
abortJob(exceptions, NoOpCallback.INSTANCE);
return;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..33bdd05 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -129,7 +129,7 @@
if (activeRunMap.containsKey(jobId)) {
JobRun jobRun = activeRunMap.get(jobId);
// The following call will abort all ongoing tasks and then
consequently
- // trigger JobCleanupWork and JobCleanupNotificationWork which
will update the lifecyle of the job.
+ // trigger JobCleanupWork and JobCleanupNotificationWork which
will update the lifecycle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
return;
@@ -139,7 +139,7 @@
if (jobRun != null) {
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
jobId));
- // Since the job has not been executed, we only need to update its
status and lifecyle here.
+ // Since the job has not been executed, we only need to update its
status and lifecycle here.
jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
@@ -170,7 +170,6 @@
return;
}
if (run.getPendingStatus() != null) {
- LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}",
run::getJobId);
return;
}
Set<String> targetNodes = run.getParticipatingNodeIds();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b53..76f8a17 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -28,6 +28,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -43,14 +44,13 @@
private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
public PartitionMatchMaker() {
- partitionDescriptors = new HashMap<PartitionId,
List<PartitionDescriptor>>();
- partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+ partitionDescriptors = new HashMap<>();
+ partitionRequests = new HashMap<>();
}
public List<Pair<PartitionDescriptor, PartitionRequest>>
registerPartitionDescriptor(
PartitionDescriptor partitionDescriptor) {
- List<Pair<PartitionDescriptor, PartitionRequest>> matches =
- new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+ List<Pair<PartitionDescriptor, PartitionRequest>> matches = new
ArrayList<>();
PartitionId pid = partitionDescriptor.getPartitionId();
boolean matched = false;
List<PartitionRequest> requests = partitionRequests.get(pid);
@@ -73,11 +73,7 @@
}
if (!matched) {
- List<PartitionDescriptor> descriptors =
partitionDescriptors.get(pid);
- if (descriptors == null) {
- descriptors = new ArrayList<PartitionDescriptor>();
- partitionDescriptors.put(pid, descriptors);
- }
+ List<PartitionDescriptor> descriptors =
partitionDescriptors.computeIfAbsent(pid, k -> new ArrayList<>());
descriptors.add(partitionDescriptor);
}
@@ -108,11 +104,7 @@
}
if (match == null) {
- List<PartitionRequest> requests = partitionRequests.get(pid);
- if (requests == null) {
- requests = new ArrayList<PartitionRequest>();
- partitionRequests.put(pid, requests);
- }
+ List<PartitionRequest> requests =
partitionRequests.computeIfAbsent(pid, k -> new ArrayList<>());
requests.add(partitionRequest);
}
@@ -133,17 +125,11 @@
}
private interface IEntryFilter<T> {
- public boolean matches(T o);
+ boolean matches(T o);
}
private static <T> void removeEntries(List<T> list, IEntryFilter<T>
filter) {
- Iterator<T> j = list.iterator();
- while (j.hasNext()) {
- T o = j.next();
- if (filter.matches(o)) {
- j.remove();
- }
- }
+ list.removeIf(filter::matches);
}
private static <T> void removeEntries(Map<PartitionId, List<T>> map,
IEntryFilter<T> filter) {
@@ -159,30 +145,17 @@
}
public void notifyNodeFailures(final Collection<String> deadNodes) {
- removeEntries(partitionDescriptors, new
IEntryFilter<PartitionDescriptor>() {
- @Override
- public boolean matches(PartitionDescriptor o) {
- return deadNodes.contains(o.getNodeId());
- }
- });
- removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
- @Override
- public boolean matches(PartitionRequest o) {
- return deadNodes.contains(o.getNodeId());
- }
- });
+ removeEntries(partitionDescriptors, o ->
deadNodes.contains(o.getNodeId()));
+ removeEntries(partitionRequests, o ->
deadNodes.contains(o.getNodeId()));
}
- public void removeUncommittedPartitions(Set<PartitionId> partitionIds,
final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+ public void removeUncommittedPartitions(Set<PartitionId> partitionIds,
final Set<TaskAttemptId> taIds,
+ JobId jobId) {
+ if (!partitionIds.isEmpty()) {
+ LOGGER.debug("Removing uncommitted partitions {}: {}", jobId,
partitionIds);
}
- IEntryFilter<PartitionDescriptor> filter = new
IEntryFilter<PartitionDescriptor>() {
- @Override
- public boolean matches(PartitionDescriptor o) {
- return o.getState() != PartitionState.COMMITTED &&
taIds.contains(o.getProducingTaskAttemptId());
- }
- };
+ IEntryFilter<PartitionDescriptor> filter =
+ o -> o.getState() != PartitionState.COMMITTED &&
taIds.contains(o.getProducingTaskAttemptId());
for (PartitionId pid : partitionIds) {
List<PartitionDescriptor> descriptors =
partitionDescriptors.get(pid);
if (descriptors != null) {
@@ -194,16 +167,11 @@
}
}
- public void removePartitionRequests(Set<PartitionId> partitionIds, final
Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing partition requests: " + partitionIds);
+ public void removePartitionRequests(Set<PartitionId> partitionIds, final
Set<TaskAttemptId> taIds, JobId jobId) {
+ if (!partitionIds.isEmpty()) {
+ LOGGER.debug("Removing partition requests {}: {}", jobId,
partitionIds);
}
- IEntryFilter<PartitionRequest> filter = new
IEntryFilter<PartitionRequest>() {
- @Override
- public boolean matches(PartitionRequest o) {
- return taIds.contains(o.getRequestingTaskAttemptId());
- }
- };
+ IEntryFilter<PartitionRequest> filter = o ->
taIds.contains(o.getRequestingTaskAttemptId());
for (PartitionId pid : partitionIds) {
List<PartitionRequest> requests = partitionRequests.get(pid);
if (requests != null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e2..06f972a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -45,7 +45,6 @@
import org.apache.hyracks.control.common.result.AbstractResultManager;
import org.apache.hyracks.control.common.result.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -79,9 +78,7 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, JobSpecification
spec) throws HyracksException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(getClass().getSimpleName() + " notified of new job "
+ jobId);
- }
+ LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(),
jobId);
if (jobResultLocations.get(jobId) != null) {
throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT,
jobId);
}
@@ -157,15 +154,19 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception>
exceptions) {
- Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
- Level logLevel = Level.DEBUG;
- if (LOGGER.isEnabled(logLevel)) {
- LOGGER.log(logLevel, "job " + jobId + " failed and is being
reported to " + getClass().getSimpleName(), ex);
- }
ResultJobRecord rjr = getResultJobRecord(jobId);
+ boolean logFailure = logFailure(rjr);
if (rjr != null) {
rjr.fail(exceptions);
}
+ Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
+ if (logFailure) {
+ if (ex == null) {
+ LOGGER.debug("job {} failed and is being reported to {}",
jobId, getClass().getSimpleName());
+ } else {
+ LOGGER.debug("job {} failed and is being reported to {}",
jobId, getClass().getSimpleName(), ex);
+ }
+ }
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
if (jobResultInfo != null) {
jobResultInfo.setException(ex);
@@ -211,6 +212,15 @@
}
}
+ private static boolean logFailure(ResultJobRecord rjr) {
+ if (rjr == null) {
+ return true;
+ }
+ // don't re-log if the state is already failed
+ ResultJobRecord.Status status = rjr.getStatus();
+ return status == null || status.getState() != State.FAILED;
+ }
+
/**
* Compares the records already known by the client for the given job's
result set id with the records that the
* result directory service knows and if there are any newly discovered
records returns a whole array with the
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 6262c47..f065940 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -52,7 +52,6 @@
@Override
public void run() {
- LOGGER.info("cleaning up {} on NCs, status={}", jobId, status);
final JobRun jobRun = jobManager.get(jobId);
if (jobRun == null) {
LOGGER.debug("ignoring cleanup for unknown {}", jobId);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066e..33d391f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,13 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
- private static final Logger LOGGER = LogManager.getLogger();
+
private final List<Exception> exceptions;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId,
TaskAttemptId taId, String nodeId,
@@ -43,9 +39,6 @@
@Override
protected void performEvent(TaskAttempt ta) {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG :
Level.WARN,
- "Executing task failure work for " + this, ex);
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
if (run == null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 0c5c233..f80bbf7 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -316,6 +316,7 @@
// the thread is not escaped from interruption.
if (!addPendingThread(ct)) {
exceptions.add(HyracksDataException.create(TASK_ABORTED,
getTaskAttemptId()));
+ LOGGER.warn("Not starting aborted task {}:{}", joblet.getJobId(),
taskAttemptId);
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
ncs.getWorkQueue()
.schedule(new NotifyTaskFailureWork(ncs, this, exceptions,
joblet.getJobId(), taskAttemptId));
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa..6dd4307 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,6 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
@@ -50,9 +49,6 @@
@Override
public void run() {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG :
Level.WARN, "task " + taskId + " has failed",
- ex);
try {
IResultPartitionManager resultPartitionManager =
ncs.getResultPartitionManager();
if (resultPartitionManager != null) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ib1c99f00cde31224b0bcb86357c64d9c5404d2e7
Gerrit-Change-Number: 18237
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange