abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2006
Change subject: [NO ISSUE][TEST] Improve Job Failure Tests
......................................................................
[NO ISSUE][TEST] Improve Job Failure Tests
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Add a check that all started jobs finished in multi nc tests.
- Job Cancellation is only completed when the job is final
completed.
Change-Id: I9cdf53a88e07aaa3dc7cd11c5bb7ef9369835da6
Change-Id: I53282b04148fd0c24f55d5b87db49d705530b99b
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
A
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
6 files changed, 148 insertions(+), 4 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/06/2006/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index d36d9b7..cbcc44f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -132,6 +132,7 @@
@Override
public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus,
List<Exception> exceptions)
throws HyracksException {
+ LOGGER.log(level, "Getting notified of job finish for JobId: " +
jobId);
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId,
Pair.of(jobStatus, exceptions)));
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index b710f2b..c01bcbc 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.app.active;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -90,8 +89,8 @@
((QueryTranslator) statementExecutor).getSessionOutput(),
mdProvider, feed, feedConnections,
compilationProvider, storageComponentProvider,
statementExecutorFactory, hcc);
JobSpecification feedJob = jobInfo.getLeft();
- WaitForStateSubscriber eventSubscriber =
- new WaitForStateSubscriber(this,
Collections.singleton(ActivityState.RUNNING));
+ WaitForStateSubscriber eventSubscriber = new
WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING,
+ ActivityState.TEMPORARILY_FAILED,
ActivityState.PERMANENTLY_FAILED));
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
entityId);
// TODO(Yingyi): currently we do not check IFrameWriter protocol
violations for Feed jobs.
// We will need to design general exception handling mechanism for
feeds.
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index e03ee6e..b1191ec 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -165,6 +165,26 @@
}
@Test
+ public void testStartWhenStartFailsCompile() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.FAIL_COMPILE);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertFailure(action, 0);
+ Assert.assertEquals(ActivityState.PERMANENTLY_FAILED,
listener.getState());
+ }
+
+ @Test
+ public void testStartWhenStartFailsRuntime() throws Exception {
+ Assert.assertEquals(ActivityState.STOPPED, listener.getState());
+ listener.onStart(Behavior.FAIL_RUNTIME);
+ Action action = users[0].startActivity(listener);
+ action.sync();
+ assertFailure(action, 0);
+ Assert.assertEquals(ActivityState.PERMANENTLY_FAILED,
listener.getState());
+ }
+
+ @Test
public void testStartWhenOneNodeFinishesBeforeOtherNodeStarts() throws
Exception {
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
listener.onStart(Behavior.SUCCEED);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index bb85c13..b57d798 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -39,6 +39,7 @@
private JobStatus status;
private List<Exception> exceptions;
private IResultCallback<Void> callback;
+ private final StackTraceElement[] creationTrace;
public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus
status, List<Exception> exceptions,
IResultCallback<Void> callback) {
@@ -47,6 +48,7 @@
this.status = status;
this.exceptions = exceptions;
this.callback = callback;
+ creationTrace = Thread.currentThread().getStackTrace();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 21c1e77..cc46a7d 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -58,6 +59,7 @@
public abstract class AbstractMultiNCIntegrationTest {
private static final Logger LOGGER =
Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
+ private static final TestJobLifecycleListener jobLifecycleListener = new
TestJobLifecycleListener();
public static final String[] ASTERIX_IDS =
{ "asterix-001", "asterix-002", "asterix-003", "asterix-004",
"asterix-005", "asterix-006", "asterix-007" };
@@ -92,7 +94,8 @@
ccConfig.setAppClass(DummyApplication.class.getName());
cc = new ClusterControllerService(ccConfig);
cc.start();
-
+ CCServiceContext serviceCtx = cc.getContext();
+ serviceCtx.addJobLifecycleListener(jobLifecycleListener);
asterixNCs = new NodeControllerService[ASTERIX_IDS.length];
for (int i = 0; i < ASTERIX_IDS.length; i++) {
File ioDev = new File("target" + File.separator + ASTERIX_IDS[i] +
File.separator + "ioDevice");
@@ -121,6 +124,7 @@
nc.stop();
}
cc.stop();
+ jobLifecycleListener.check();
}
protected JobId startJob(JobSpecification spec) throws Exception {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
new file mode 100644
index 0000000..c8d0b9c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+
+public class TestJobLifecycleListener implements IJobLifecycleListener {
+
+ private static final Logger LOGGER =
Logger.getLogger(TestJobLifecycleListener.class.getName());
+ private final Map<JobId, JobSpecification> created = new HashMap<>();
+ private final Set<JobId> started = new HashSet<>();
+ private final Set<JobId> finished = new HashSet<>();
+ private final Map<JobId, Integer> doubleCreated = new HashMap<>();
+ private final Map<JobId, Integer> doubleStarted = new HashMap<>();
+ private final Map<JobId, Integer> doubleFinished = new HashMap<>();
+ private final Set<JobId> startWithoutCreate = new HashSet<>();
+ private final Set<JobId> finishWithoutStart = new HashSet<>();
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) throws
HyracksException {
+ if (created.containsKey(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has been created
before");
+ increment(doubleCreated, jobId);
+ }
+ created.put(jobId, spec);
+ }
+
+ private void increment(Map<JobId, Integer> map, JobId jobId) {
+ Integer count = map.get(jobId);
+ count = count == null ? 2 : count + 1;
+ map.put(jobId, count);
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId) throws HyracksException {
+ if (!created.containsKey(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has not been created");
+ startWithoutCreate.add(jobId);
+ }
+ if (started.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has been started
before");
+ increment(doubleStarted, jobId);
+ }
+ started.add(jobId);
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId, JobStatus jobStatus,
List<Exception> exceptions) throws HyracksException {
+ if (!started.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "Job " + jobId + "has not been started");
+ finishWithoutStart.add(jobId);
+ }
+ if (finished.contains(jobId)) {
+ // TODO: job finish should be called once only when it has really
completed
+ // throw new HyracksDataException("Job " + jobId + "has been
finished before");
+ LOGGER.log(Level.WARNING, "Dangerous: Duplicate Job: " + jobId + "
has finished with status: " + jobStatus);
+ increment(doubleFinished, jobId);
+ }
+ finished.add(jobId);
+ }
+
+ public void check() throws Exception {
+ LOGGER.log(Level.WARNING, "Checking all created jobs have started");
+ for (JobId jobId : created.keySet()) {
+ if (!started.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "JobId " + jobId + " has been
created but never started");
+ }
+ }
+ LOGGER.log(Level.WARNING, "Checking all started jobs have terminated");
+ for (JobId jobId : started) {
+ if (!finished.contains(jobId)) {
+ LOGGER.log(Level.WARNING, "JobId " + jobId + " has started but
not finished");
+ }
+ }
+ LOGGER.log(Level.WARNING, "Checking multiple creates");
+ for (Entry<JobId, Integer> entry : doubleCreated.entrySet()) {
+ LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been
created " + entry.getValue() + " times");
+ }
+ LOGGER.log(Level.WARNING, "Checking multiple starts");
+ for (Entry<JobId, Integer> entry : doubleStarted.entrySet()) {
+ LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been
started " + entry.getValue() + " times");
+ }
+ LOGGER.log(Level.WARNING, "Checking multiple finishes");
+ for (Entry<JobId, Integer> entry : doubleFinished.entrySet()) {
+ LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been
finished " + entry.getValue() + " times");
+ }
+ LOGGER.log(Level.WARNING, "Done checking!");
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2006
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I53282b04148fd0c24f55d5b87db49d705530b99b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>