>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237 )
Change subject: [NO ISSUE][OTH] Logging improvements for job failure events ...................................................................... [NO ISSUE][OTH] Logging improvements for job failure events - user model changes: no - storage format changes: no - interface changes: no Details: Change-Id: Ib1c99f00cde31224b0bcb86357c64d9c5404d2e7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237 Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java 7 files changed, 67 insertions(+), 94 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve Jenkins: Verified diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index 3574acd..631f226 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -539,7 +539,7 @@ private void abortTaskCluster(TaskClusterAttempt tcAttempt, TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) { - LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt()); + LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt()); Set<TaskAttemptId> abortTaskIds = new HashSet<>(); Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>(); for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) { @@ -561,14 +561,12 @@ } } final JobId jobId = jobRun.getJobId(); - LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + abortTaskAttemptMap); + LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap); INodeManager nodeManager = ccs.getNodeManager(); abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> { final NodeControllerState node = nodeManager.getNodeControllerState(key); if (node != null) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + key); - } + LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key); try { node.getNodeController().abortTasks(jobId, abortTaskAttempts); } catch (Exception e) { @@ -579,8 +577,8 @@ inProgressTaskClusters.remove(tcAttempt.getTaskCluster()); TaskCluster tc = tcAttempt.getTaskCluster(); PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker(); - pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds); - pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds); + pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds, jobId); + pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds, jobId); tcAttempt.setStatus(failedOrAbortedStatus); tcAttempt.setEndTime(System.currentTimeMillis()); @@ -683,7 +681,6 @@ */ public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) { try { - LOGGER.debug("Received failure notification for TaskAttempt " + ta.getTaskAttemptId()); TaskAttemptId taId = ta.getTaskAttemptId(); TaskCluster tc = ta.getTask().getTaskCluster(); TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); @@ -696,7 +693,7 @@ LOGGER.trace(() -> "Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed and the number of max re-attempts = " + maxReattempts); if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) { - LOGGER.debug(() -> "Aborting the job of " + ta.getTaskAttemptId()); + LOGGER.debug("Aborting the job:{} of {}", jobRun.getJobId(), ta.getTaskAttemptId()); abortJob(exceptions, NoOpCallback.INSTANCE); return; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 4882f4a..33bdd05 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -129,7 +129,7 @@ if (activeRunMap.containsKey(jobId)) { JobRun jobRun = activeRunMap.get(jobId); // The following call will abort all ongoing tasks and then consequently - // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. + // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job. // Therefore, we do not remove the job out of activeRunMap here. jobRun.getExecutor().cancelJob(callback); return; @@ -139,7 +139,7 @@ if (jobRun != null) { List<Exception> exceptions = Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); - // Since the job has not been executed, we only need to update its status and lifecyle here. + // Since the job has not been executed, we only need to update its status and lifecycle here. jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); @@ -170,7 +170,6 @@ return; } if (run.getPendingStatus() != null) { - LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId); return; } Set<String> targetNodes = run.getParticipatingNodeIds(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java index ac29b53..8f91944 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.common.job.PartitionDescriptor; import org.apache.hyracks.control.common.job.PartitionRequest; @@ -43,14 +44,13 @@ private final Map<PartitionId, List<PartitionRequest>> partitionRequests; public PartitionMatchMaker() { - partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>(); - partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>(); + partitionDescriptors = new HashMap<>(); + partitionRequests = new HashMap<>(); } public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor( PartitionDescriptor partitionDescriptor) { - List<Pair<PartitionDescriptor, PartitionRequest>> matches = - new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>(); + List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<>(); PartitionId pid = partitionDescriptor.getPartitionId(); boolean matched = false; List<PartitionRequest> requests = partitionRequests.get(pid); @@ -73,11 +73,7 @@ } if (!matched) { - List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); - if (descriptors == null) { - descriptors = new ArrayList<PartitionDescriptor>(); - partitionDescriptors.put(pid, descriptors); - } + List<PartitionDescriptor> descriptors = partitionDescriptors.computeIfAbsent(pid, k -> new ArrayList<>()); descriptors.add(partitionDescriptor); } @@ -108,11 +104,7 @@ } if (match == null) { - List<PartitionRequest> requests = partitionRequests.get(pid); - if (requests == null) { - requests = new ArrayList<PartitionRequest>(); - partitionRequests.put(pid, requests); - } + List<PartitionRequest> requests = partitionRequests.computeIfAbsent(pid, k -> new ArrayList<>()); requests.add(partitionRequest); } @@ -133,17 +125,11 @@ } private interface IEntryFilter<T> { - public boolean matches(T o); + boolean matches(T o); } private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) { - Iterator<T> j = list.iterator(); - while (j.hasNext()) { - T o = j.next(); - if (filter.matches(o)) { - j.remove(); - } - } + list.removeIf(filter::matches); } private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) { @@ -159,30 +145,16 @@ } public void notifyNodeFailures(final Collection<String> deadNodes) { - removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() { - @Override - public boolean matches(PartitionDescriptor o) { - return deadNodes.contains(o.getNodeId()); - } - }); - removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() { - @Override - public boolean matches(PartitionRequest o) { - return deadNodes.contains(o.getNodeId()); - } - }); + removeEntries(partitionDescriptors, o -> deadNodes.contains(o.getNodeId())); + removeEntries(partitionRequests, o -> deadNodes.contains(o.getNodeId())); } - public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Removing uncommitted partitions: " + partitionIds); + public void removeUncommittedPartitions(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) { + if (!partitionIds.isEmpty()) { + LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, partitionIds); } - IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() { - @Override - public boolean matches(PartitionDescriptor o) { - return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId()); - } - }; + IEntryFilter<PartitionDescriptor> filter = + o -> o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId()); for (PartitionId pid : partitionIds) { List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); if (descriptors != null) { @@ -194,16 +166,11 @@ } } - public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Removing partition requests: " + partitionIds); + public void removePartitionRequests(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) { + if (!partitionIds.isEmpty()) { + LOGGER.debug("Removing partition requests {}: {}", jobId, partitionIds); } - IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() { - @Override - public boolean matches(PartitionRequest o) { - return taIds.contains(o.getRequestingTaskAttemptId()); - } - }; + IEntryFilter<PartitionRequest> filter = o -> taIds.contains(o.getRequestingTaskAttemptId()); for (PartitionId pid : partitionIds) { List<PartitionRequest> requests = partitionRequests.get(pid); if (requests != null) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index 9f8a7e2..46dd351 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -45,7 +45,6 @@ import org.apache.hyracks.control.common.result.AbstractResultManager; import org.apache.hyracks.control.common.result.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -79,9 +78,7 @@ @Override public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId); - } + LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), jobId); if (jobResultLocations.get(jobId) != null) { throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId); } @@ -157,15 +154,14 @@ @Override public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) { - Exception ex = exceptions.isEmpty() ? null : exceptions.get(0); - Level logLevel = Level.DEBUG; - if (LOGGER.isEnabled(logLevel)) { - LOGGER.log(logLevel, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), ex); - } ResultJobRecord rjr = getResultJobRecord(jobId); + if (logFailure(rjr)) { + LOGGER.debug("job {} failed and is being reported to {}", jobId, getClass().getSimpleName()); + } if (rjr != null) { rjr.fail(exceptions); } + Exception ex = exceptions.isEmpty() ? null : exceptions.get(0); final JobResultInfo jobResultInfo = jobResultLocations.get(jobId); if (jobResultInfo != null) { jobResultInfo.setException(ex); @@ -211,6 +207,15 @@ } } + private static boolean logFailure(ResultJobRecord rjr) { + if (rjr == null) { + return true; + } + // don't re-log if the state is already failed + ResultJobRecord.Status status = rjr.getStatus(); + return status == null || status.getState() != State.FAILED; + } + /** * Compares the records already known by the client for the given job's result set id with the records that the * result directory service knows and if there are any newly discovered records returns a whole array with the @@ -264,7 +269,7 @@ class JobResultInfo { - private ResultJobRecord record; + private final ResultJobRecord record; private Waiters waiters; private Exception exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index 77d2f82..6454804 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -34,11 +34,11 @@ public class JobCleanupWork extends AbstractWork { private static final Logger LOGGER = LogManager.getLogger(); - private IJobManager jobManager; - private JobId jobId; - private JobStatus status; - private List<Exception> exceptions; - private IResultCallback<Void> callback; + private final IJobManager jobManager; + private final JobId jobId; + private final JobStatus status; + private final List<Exception> exceptions; + private final IResultCallback<Void> callback; public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, IResultCallback<Void> callback) { @@ -51,9 +51,6 @@ @Override public void run() { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Cleanup for job: {}", jobId); - } final JobRun jobRun = jobManager.get(jobId); if (jobRun == null) { LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java index 833066e..33d391f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java @@ -22,17 +22,13 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.job.TaskAttempt; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class TaskFailureWork extends AbstractTaskLifecycleWork { - private static final Logger LOGGER = LogManager.getLogger(); + private final List<Exception> exceptions; public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, @@ -43,9 +39,6 @@ @Override protected void performEvent(TaskAttempt ta) { - Exception ex = exceptions.get(0); - LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, - "Executing task failure work for " + this, ex); IJobManager jobManager = ccs.getJobManager(); JobRun run = jobManager.get(jobId); if (run == null) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index b0c60aa..6dd4307 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -23,7 +23,6 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.result.IResultPartitionManager; -import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; @@ -50,9 +49,6 @@ @Override public void run() { - Exception ex = exceptions.get(0); - LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, "task " + taskId + " has failed", - ex); try { IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager(); if (resultPartitionManager != null) { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: Ib1c99f00cde31224b0bcb86357c64d9c5404d2e7 Gerrit-Change-Number: 18237 Gerrit-PatchSet: 7 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
