Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2350

Change subject: [NO ISSUE][BAD] DeployedJobEventListener and test case fix
......................................................................

[NO ISSUE][BAD] DeployedJobEventListener and test case fix

1. The concurrent execution test case sometimes failed at result short.
The reason is the deployed job is removed before all invocations
finished. Added a sleep to the test case, also added running instance
check when dropping the procedure.
2. Test cases refactored to make the overall test time shorter.

Change-Id: I12ecf5c3c8f5a5c58fefa80673565c0ae3d1c9e6
---
M asterix-bad/pom.xml
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
M asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
M 
asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
M 
asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
M 
asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp
R 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp
R 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp
12 files changed, 74 insertions(+), 39 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad 
refs/changes/50/2350/1

diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index e94b912..291697d 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -28,7 +28,8 @@
     <asterix.version>0.9.3-SNAPSHOT</asterix.version>
     <hyracks.version>0.3.3-SNAPSHOT</hyracks.version>
     <source-format.skip>true</source-format.skip>
-  </properties>
+    
<testLog4jConfigFile>${root.dir}/../../asterix-app/src/test/resources/log4j2-test.xml</testLog4jConfigFile>
+    </properties>
     <build>
     <plugins>
       <plugin>
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index cd60b1a..0908edb 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -246,6 +246,7 @@
     private void setupDeployedJobSpec(EntityId entityId, JobSpecification 
jobSpec, IHyracksClientConnection hcc,
             DeployedJobSpecEventListener listener, ResultSetId resultSetId, 
IHyracksDataset hdc, Stats stats)
             throws Exception {
+        
jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
         DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
         listener.storeDistributedInfo(deployedJobSpecId, null, hdc, 
resultSetId);
     }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index b6c66dc..7ab7f95 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -18,12 +18,6 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
@@ -63,7 +57,15 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
+import java.io.DataOutput;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
 public class ExecuteProcedureStatement implements IExtensionStatement {
+
+    public static final String WAIT_FOR_COMPLETION = 
"wait-for-completion-procedure";
 
     private final String dataverseName;
     private final String procedureName;
@@ -118,6 +120,7 @@
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
+        JobId jobId;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
@@ -130,12 +133,18 @@
             if (procedure.getDuration().equals("")) {
 
                 //Add the Asterix Transaction Id to the map
+                long newTxId = TxnIdFactory.create().getId();
                 
contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
-                        
String.valueOf(TxnIdFactory.create().getId()).getBytes());
-                JobId jobId = hcc.startJob(deployedJobSpecId, 
contextRuntimeVarMap);
+                        String.valueOf(newTxId).getBytes());
+                jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
+
+                boolean wait = 
Boolean.parseBoolean(metadataProvider.getConfig().get(
+                        ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
+                if (wait || listener.getType() == PrecompiledType.QUERY) {
+                    hcc.waitForCompletion(jobId);
+                }
 
                 if (listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(jobId);
                     ResultReader resultReader =
                             new ResultReader(listener.getResultDataset(), 
jobId, listener.getResultId());
 
@@ -150,7 +159,6 @@
                 listener.storeDistributedInfo(deployedJobSpecId, ses, 
listener.getResultDataset(),
                         listener.getResultId());
             }
-
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             txnActive = false;
         } catch (Exception e) {
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 1555bea..f0eaced 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -93,7 +93,13 @@
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, 
dataverse, signature.getName());
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) 
activeEventHandler.getListener(entityId);
-        Procedure procedure = null;
+
+        if (listener.isActive()) {
+            throw new AlgebricksException("Cannot drop running procedure. 
There are " + listener.getRunningInstance()
+                    + " running instances.");
+        }
+
+        Procedure procedure;
 
         MetadataTransactionContext mdTxnCtx = null;
         try {
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 13f9e0d..070c148 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -18,19 +18,12 @@
  */
 package org.apache.asterix.bad.metadata;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.IDataset;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -41,6 +34,11 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
 
 public class DeployedJobSpecEventListener implements 
IActiveEntityEventsListener {
 
@@ -62,11 +60,11 @@
 
     private DeployedJobSpecId deployedJobSpecId;
     private ScheduledExecutorService executorService = null;
-    private ResultReader resultReader;
     private final PrecompiledType type;
 
     private IHyracksDataset hdc;
     private ResultSetId resultSetId;
+
     // members
     protected volatile ActivityState state;
     protected JobId jobId;
@@ -79,7 +77,7 @@
     protected RequestState statsRequestState;
     protected final String runtimeName;
     protected final AlgebricksAbsolutePartitionConstraint locations;
-    protected int numRegistered;
+    private int runningInstance;
 
     public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId 
entityId, PrecompiledType type,
             AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName) {
@@ -92,7 +90,6 @@
         this.stats = "{\"Stats\":\"N/A\"}";
         this.runtimeName = runtimeName;
         this.locations = locations;
-        this.numRegistered = 0;
         state = ActivityState.STOPPED;
         this.type = type;
     }
@@ -108,15 +105,6 @@
 
     public DeployedJobSpecId getDeployedJobSpecId() {
         return deployedJobSpecId;
-    }
-
-    protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == 
ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
-            numRegistered++;
-            if (numRegistered == locations.getLocations().length) {
-                state = ActivityState.RUNNING;
-            }
-        }
     }
 
     @Override
@@ -182,10 +170,6 @@
         return locations;
     }
 
-    public ResultReader getResultReader() {
-        return resultReader;
-    }
-
     public PrecompiledType getType() {
         return type;
     }
@@ -234,12 +218,17 @@
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job started for  " + entityId);
         }
+        runningInstance++;
         state = ActivityState.RUNNING;
     }
 
     private synchronized void handleJobFinishEvent(ActiveEvent message) throws 
Exception {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job finished for  " + entityId);
+        }
+        runningInstance--;
+        if (runningInstance == 0) {
+            state = ActivityState.STOPPED;
         }
     }
 
@@ -266,4 +255,8 @@
     public String getDisplayName() throws HyracksDataException {
         return this.entityId.toString();
     }
+
+    public int getRunningInstance() {
+        return runningInstance;
+    }
 }
diff --git 
a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java 
b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
index bc24e9f..9701b2b 100644
--- 
a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
+++ 
b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.logging.Logger;
 
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -61,7 +62,7 @@
     public static void setUp() throws Exception {
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
-        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME, new 
AsterixHyracksIntegrationUtil(), false, null);
     }
 
     @AfterClass
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
index 11b7b33..a21a4be 100644
--- 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
@@ -43,4 +43,4 @@
 
 create broker brokerA at "http://www.notifyA.com";;
 
-create repetitive channel roomRecords using RoomOccupants@1 period 
duration("PT30S");
+create repetitive channel roomRecords using RoomOccupants@1 period 
duration("PT10S");
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
index d5f4290..c750707 100644
--- 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
@@ -22,4 +22,4 @@
 * Date         : Sep 2016
 * Author       : Steven Jacobs
 */
-630000
+110000
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
index cfe92c9..b9282fe 100644
--- 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
@@ -25,4 +25,4 @@
 
 use channels;
 
-(select value count(result) from roomRecordsResults)[0] > 19;
\ No newline at end of file
+(select value count(result) from roomRecordsResults)[0] > 9;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp
new file mode 100644
index 0000000..873e944
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : 3
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+1000
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp
similarity index 100%
rename from 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp
rename to 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp
similarity index 100%
rename from 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp
rename to 
asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2350
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I12ecf5c3c8f5a5c58fefa80673565c0ae3d1c9e6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkk...@gmail.com>

Reply via email to