Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1394
Change subject: WIP - async result
......................................................................
WIP - async result
Change-Id: Iafba65d9c7bd8643c42e5126c8d89164ae328908
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
8 files changed, 174 insertions(+), 11 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/94/1394/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index 144531e..cac9cd3 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -204,6 +204,7 @@
import
org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardPrefixDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedCheckDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.SleepDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SpatialAreaDescriptor;
import org.apache.asterix.runtime.evaluators.functions.SpatialCellDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
@@ -411,6 +412,9 @@
temp.add(OrderedListConstructorDescriptor.FACTORY);
temp.add(UnorderedListConstructorDescriptor.FACTORY);
+ // Sleep function
+ temp.add(SleepDescriptor.FACTORY);
+
// Inject failure function
temp.add(InjectFailureDescriptor.FACTORY);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
index c994abd..023fd0c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadFactory;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.algebra.base.ILangExtension.Language;
@@ -60,12 +61,12 @@
* @throws ClassNotFoundException
* @throws HyracksDataException
*/
- public CompilerExtensionManager(List<AsterixExtension> list)
+ public CompilerExtensionManager(List<AsterixExtension> list, ThreadFactory
threadFactory)
throws InstantiationException, IllegalAccessException,
ClassNotFoundException, HyracksDataException {
Pair<ExtensionId, ILangCompilationProvider> aqlcp = null;
Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
IStatementExecutorExtension see = null;
- defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory();
+ defaultQueryTranslatorFactory = new
DefaultStatementExecutorFactory(threadFactory);
if (list != null) {
for (AsterixExtension extensionConf : list) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index 6cdf329..8204c5f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -19,18 +19,30 @@
package org.apache.asterix.app.translator;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
public class DefaultStatementExecutorFactory implements
IStatementExecutorFactory {
+
+ protected final ThreadFactory threadFactory;
+
+ public DefaultStatementExecutorFactory() {
+ this(new HyracksThreadFactory("DefaultStatementExecutorFactory"));
+ }
+
+ public DefaultStatementExecutorFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ }
@Override
public QueryTranslator create(List<Statement> aqlStatements, SessionConfig
conf,
ILangCompilationProvider compilationProvider) {
- return new QueryTranslator(aqlStatements, conf, compilationProvider);
+ return new QueryTranslator(aqlStatements, conf, compilationProvider,
threadFactory);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 643a352..bdf2075 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -37,6 +37,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -53,12 +54,12 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.result.ResultUtil;
-import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import
org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -226,15 +227,17 @@
protected final List<FunctionDecl> declaredFunctions;
protected final APIFramework apiFramework;
protected final IRewriterFactory rewriterFactory;
+ protected ThreadFactory threadFactory;
public QueryTranslator(List<Statement> statements, SessionConfig conf,
- ILangCompilationProvider compliationProvider) {
+ ILangCompilationProvider compliationProvider, ThreadFactory
threadFactory) {
this.statements = statements;
this.sessionConfig = conf;
this.declaredFunctions = getDeclaredFunctions(statements);
this.apiFramework = new APIFramework(compliationProvider);
this.rewriterFactory = compliationProvider.getRewriterFactory();
- activeDefaultDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+ this.activeDefaultDataverse =
MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+ this.threadFactory = threadFactory;
}
protected List<FunctionDecl> getDeclaredFunctions(List<Statement>
statements) {
@@ -2552,7 +2555,7 @@
jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds,
metadataProvider));
}
- protected JobSpecification handleQuery(MetadataProvider metadataProvider,
Query query,
+ protected void handleQuery(MetadataProvider metadataProvider, Query query,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats)
throws Exception {
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
@@ -2567,11 +2570,9 @@
if (query.isExplain()) {
sessionConfig.out().flush();
- return jobSpec;
} else if (sessionConfig.isExecuteQuery() && jobSpec != null) {
handleQueryResult(metadataProvider, hcc, hdc, jobSpec,
resultDelivery, stats);
}
- return jobSpec;
} catch (Exception e) {
LOGGER.log(Level.INFO, e.getMessage(), e);
if (bActiveTxn) {
@@ -2596,7 +2597,18 @@
switch (resultDelivery) {
case ASYNC:
ResultUtil.printResultHandle(new ResultHandle(jobId,
metadataProvider.getResultSetId()), sessionConfig);
- hcc.waitForCompletion(jobId);
+ if (threadFactory != null) {
+ threadFactory.newThread(() -> {
+ try {
+ hcc.waitForCompletion(jobId);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
+ resultDelivery.name() + " job " + "with id
" + jobId + " failed");
+ }
+ });
+ } else {
+ hcc.waitForCompletion(jobId);
+ }
break;
case IMMEDIATE:
hcc.waitForCompletion(jobId);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 25a5418..32d90dd 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -103,7 +103,7 @@
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
GlobalRecoveryManager.instance(),
libraryManager, resourceIdManager, () ->
MetadataManager.INSTANCE);
- ccExtensionManager = new CompilerExtensionManager(getExtensions());
+ ccExtensionManager = new CompilerExtensionManager(getExtensions(),
appCtx.getThreadFactory());
AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
final CCConfig ccConfig = controllerService.getCCConfig();
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 089f804..711ea2f 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -91,6 +91,7 @@
import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer;
import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer;
import
org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType;
+import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer;
import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer;
import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer;
import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer;
@@ -647,6 +648,8 @@
"spatial-cell", 4);
public static final FunctionIdentifier SWITCH_CASE = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"switch-case", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier SLEEP = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sleep", 2);
public static final FunctionIdentifier INJECT_FAILURE = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"inject-failure", 2);
public static final FunctionIdentifier FLOW_RECORD = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1053,6 +1056,7 @@
addPrivateFunction(SUBSET_COLLECTION,
SubsetCollectionTypeComputer.INSTANCE, true);
addFunction(SUBSTRING, SubstringTypeComputer.INSTANCE, true);
addFunction(SWITCH_CASE, SwitchCaseComputer.INSTANCE, true);
+ addFunction(SLEEP, SleepTypeComputer.INSTANCE, false);
addPrivateFunction(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE,
true);
addPrivateFunction(CAST_TYPE, CastTypeComputer.INSTANCE, true);
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
new file mode 100644
index 0000000..7063698
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.exceptions.TypeMismatchException;
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class SleepTypeComputer extends AbstractResultTypeComputer {
+ public static final SleepTypeComputer INSTANCE = new SleepTypeComputer();
+
+ @Override
+ public void checkArgType(String funcName, int argIndex, IAType type)
throws AlgebricksException {
+ if (argIndex == 1) {
+ switch (type.getTypeTag()) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ break;
+ default:
+ throw new TypeMismatchException(funcName, argIndex,
type.getTypeTag(), ATypeTag.INT8, ATypeTag.INT16,
+ ATypeTag.INT32, ATypeTag.INT64);
+ }
+ }
+ }
+
+ @Override
+ public IAType getResultType(ILogicalExpression expr, IAType... types)
throws AlgebricksException {
+ return types[0];
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
new file mode 100644
index 0000000..663a21d
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY =
SleepDescriptor::new;
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final
IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(final
IHyracksTaskContext ctx) throws HyracksDataException {
+ return new IScalarEvaluator() {
+
+ private IPointable argTime = new VoidPointable();
+ private final IScalarEvaluator evalValue =
args[0].createScalarEvaluator(ctx);
+ private final IScalarEvaluator evalTime =
args[1].createScalarEvaluator(ctx);
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple,
IPointable result) throws HyracksDataException {
+ evalValue.evaluate(tuple, result);
+ evalTime.evaluate(tuple, argTime);
+
+ final byte[] bytes = argTime.getByteArray();
+ final int offset = argTime.getStartOffset();
+ final long time =
ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
+
+ try {
+ Thread.sleep(time / 1000000, (int) (time %
1000000));
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SLEEP;
+ }
+
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1394
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iafba65d9c7bd8643c42e5126c8d89164ae328908
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>