Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2611

Change subject: [WIP] Deployed Function
......................................................................

[WIP] Deployed Function

WIP

Change-Id: I3c5a2f4c458ea7e19632d9fb9b5cd77b4e64a71f
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DeployedFunctionUtils.java
M asterixdb/asterix-app/src/main/resources/cc.conf
M asterixdb/asterix-app/src/main/resources/log4j2.xml
M asterixdb/asterix-app/src/test/resources/cc.conf
M asterixdb/asterix-app/src/test/resources/log4j2-asterixdb-test.xml
M asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.3.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.4.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.4.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.4.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.4.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/deployed_function/deployed_function.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
A 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeployFunctionStatement.java
A 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InvokeFunctionStatement.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
A 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractDeployedFunctionCallFromSelectRule.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
48 files changed, 1,303 insertions(+), 116 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/11/2611/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 4f9b4df..0896678 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -424,17 +424,19 @@
         }
     }
 
-    private boolean isMatched(IAType type1, IAType type2, boolean 
useListDomain) throws AlgebricksException {
+    private boolean isMatched(IAType type1, IAType type2, boolean 
useListDomain) {
         // Sanity check - two types can't be NULL in order to be matched.
         if (type1 == null || type2 == null) {
             return false;
         }
-        if 
(ATypeHierarchy.isSameTypeDomain(Index.getNonNullableType(type1).first.getTypeTag(),
-                Index.getNonNullableType(type2).first.getTypeTag(), 
useListDomain)) {
+        IAType nonNullableType1 = Index.getNonNullableType(type1).first;
+        IAType nonNullableType2 = Index.getNonNullableType(type2).first;
+        if (ATypeHierarchy.isSameTypeDomain(nonNullableType1.getTypeTag(), 
nonNullableType2.getTypeTag(),
+                useListDomain)) {
             return true;
         }
-        return 
ATypeHierarchy.canPromote(Index.getNonNullableType(type1).first.getTypeTag(),
-                Index.getNonNullableType(type2).first.getTypeTag());
+        return nonNullableType1.getTypeTag() == ATypeTag.ANY
+                || ATypeHierarchy.canPromote(nonNullableType1.getTypeTag(), 
nonNullableType2.getTypeTag());
     }
 
     /**
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index eafbdaf..c6cd264 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -569,9 +569,15 @@
                             indexedFieldTypeTag, index.isEnforced(), 
TypeCastingMathFunctionType.NONE);
                 }
             }
-            // No type-casting at all
+            // No manually type-casting or function returns ANY
             if (replacedConstantValue == null) {
-                return new Triple<>(constantAtRuntimeExpression, null, false);
+                AbstractFunctionCallExpression cast =
+                        new 
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CAST_TYPE));
+                cast.getArguments().add(new 
MutableObject<>(constantAtRuntimeExpression));
+                TypeCastUtils.setRequiredAndInputTypes(cast, 
TypeComputeUtils.getActualType(indexedFieldType),
+                        optFuncExpr.getConstantType(0));
+                return new Triple<>(cast, null, false);
+                //                return new 
Triple<>(constantAtRuntimeExpression, null, false);
             }
             // A type-casting happened, but not EQ case
             if (replacedConstantValueForEQCase == null) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index d22f657..58db40e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -530,6 +530,7 @@
             }
             errorCount = 0;
         } catch (Exception | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError e) {
+            e.printStackTrace();
             handleExecuteStatementException(e, execution, param);
             response.setStatus(execution.getHttpStatus());
             ResultUtil.printError(sessionOutput.out(), e);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index ec128c2..55a4996 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -87,6 +87,15 @@
         new ResultPrinter(appCtx, output, stats, recordType).print(record);
     }
 
+    public static void printString(SessionOutput output, String anyString) {
+        try {
+            AlgebricksAppendable app = new AlgebricksAppendable(output.out());
+            output.appendHandle(app, anyString);
+        } catch (AlgebricksException e) {
+
+        }
+    }
+
     public static void printResultHandle(SessionOutput output, ResultHandle 
handle) throws HyracksDataException {
         try {
             final AlgebricksAppendable app = new 
AlgebricksAppendable(output.out());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6a89bda..c6e0aeb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -91,6 +91,7 @@
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.statement.DeployFunctionStatement;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.ExternalDetailsDecl;
@@ -101,6 +102,7 @@
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.InvokeFunctionStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -145,10 +147,12 @@
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataLockUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.om.functions.IFunctionExtensionManager;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
+import org.apache.asterix.runtime.functions.FunctionManager;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
@@ -167,6 +171,7 @@
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
+import org.apache.asterix.utils.DeployedFunctionUtils;
 import org.apache.asterix.utils.FeedOperations;
 import org.apache.asterix.utils.FlushDatasetUtil;
 import org.apache.commons.lang3.StringUtils;
@@ -189,6 +194,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.UnmanagedFileSplit;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -332,7 +338,7 @@
                         handleCreateFunctionStatement(metadataProvider, stmt);
                         break;
                     case FUNCTION_DROP:
-                        handleFunctionDropStatement(metadataProvider, stmt);
+                        handleFunctionDropStatement(metadataProvider, stmt, 
hcc);
                         break;
                     case LOAD:
                         handleLoadStatement(metadataProvider, stmt, hcc);
@@ -397,6 +403,20 @@
                     case FUNCTION_DECL:
                         // No op
                         break;
+                    case DEPLOY:
+                        metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter));
+                        metadataProvider.setResultAsyncMode(false);
+                        metadataProvider.setMaxResultReads(maxResultReads);
+                        handleDeployStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case INVOKE:
+                        // Currently, multiple invocations in a single query 
read result from the same result dataset.
+                        // Only the last invocation will be kept.
+                        metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter));
+                        metadataProvider.setResultAsyncMode(false);
+                        metadataProvider.setMaxResultReads(maxResultReads);
+                        handleInvokeStatement(metadataProvider, stmt, hcc, 
hdc, stats);
+                        break;
                     case EXTENSION:
                         ((ExtensionStatement) stmt).handle(hcc, this, 
requestParameters, metadataProvider,
                                 resultSetIdCounter);
@@ -407,6 +427,69 @@
             }
         } finally {
             Thread.currentThread().setName(threadName);
+        }
+    }
+
+    protected void handleInvokeStatement(MetadataProvider metadataProvider, 
Statement stmt,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) 
throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        InvokeFunctionStatement ifs = (InvokeFunctionStatement) stmt;
+        FunctionManager fnManager =
+                (FunctionManager) ((IFunctionExtensionManager) 
appCtx.getExtensionManager()).getFunctionManager();
+        DeployedJobSpecId deployedJobSpecId =
+                
fnManager.lookupDeployedFunctionJobSpecId(ifs.getFunctionCallExpr().getFunctionSignature());
+        if (deployedJobSpecId == null) {
+            throw new HyracksDataException(
+                    "Cannot find deployed function for " + 
ifs.getFunctionCallExpr().getFunctionSignature());
+        }
+        try {
+            Function function = 
FunctionUtil.lookupUserDefinedFunctionDecl(mdTxnCtx,
+                    ifs.getFunctionCallExpr().getFunctionSignature());
+            Map<byte[], byte[]> jobParameter = 
DeployedFunctionUtils.populateJobParameter(
+                    ifs.getFunctionCallExpr().getExprList(), function, 
metadataProvider.getTxnIdFactory().create());
+            JobId hyracksJobId = hcc.startJob(deployedJobSpecId, jobParameter);
+            IResultPrinter resultPrinter = id -> {
+                final ResultReader resultReader = new ResultReader(hdc, id, 
metadataProvider.getResultSetId());
+                updateJobStats(id, stats);
+                sessionOutput.release();
+                ResultUtil.printResults(appCtx, resultReader, sessionOutput, 
stats,
+                        metadataProvider.findOutputRecordType());
+            };
+            hcc.waitForCompletion(hyracksJobId);
+            resultPrinter.print(hyracksJobId);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    protected void handleDeployStatement(MetadataProvider metadataProvider, 
Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        FunctionManager fnManager =
+                (FunctionManager) ((IFunctionExtensionManager) 
appCtx.getExtensionManager()).getFunctionManager();
+        DeployFunctionStatement dfs = (DeployFunctionStatement) stmt;
+        try {
+            FunctionSignature functionSignature = dfs.getSignature();
+            Function function = 
FunctionUtil.lookupUserDefinedFunctionDecl(mdTxnCtx, functionSignature);
+            if (function == null) {
+                throw new HyracksDataException("Cannot find function " + 
functionSignature);
+            }
+            JobSpecification jobSpec =
+                    
DeployedFunctionUtils.makeDeployedJobSpecification(function, metadataProvider, 
hcc, this);
+            DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
+            fnManager.registerDeployedFunctionJobSpecId(dfs.getSignature(), 
deployedJobSpecId);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1213,10 +1296,14 @@
             // # check whether any function in current dataverse is being used 
by others
             List<Function> functionsInDataverse =
                     MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, 
dataverseName);
+            FunctionManager fnManager =
+                    (FunctionManager) ((IFunctionExtensionManager) 
appCtx.getExtensionManager()).getFunctionManager();
             for (Function function : functionsInDataverse) {
                 if (isFunctionUsed(mdTxnCtx, function.getSignature(), 
dataverseName)) {
                     throw new 
MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE,
                             function.getDataverseName() + "." + 
function.getName() + "@" + function.getArity());
+                } else {
+                    DeployedFunctionUtils.dropDeployedFunction(fnManager, 
function.getSignature(), hcc);
                 }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1734,7 +1821,8 @@
         return false;
     }
 
-    protected void handleFunctionDropStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
+    protected void handleFunctionDropStatement(MetadataProvider 
metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
         FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
         FunctionSignature signature = stmtDropFunction.getFunctionSignature();
         
signature.setNamespace(getActiveDataverseName(signature.getNamespace()));
@@ -1749,6 +1837,10 @@
             } else if (isFunctionUsed(mdTxnCtx, signature, null)) {
                 throw new 
MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE, signature);
             } else {
+                FunctionManager fnManager = (FunctionManager) 
((IFunctionExtensionManager) appCtx.getExtensionManager())
+                        .getFunctionManager();
+                DeployedFunctionUtils.dropDeployedFunction(fnManager, 
signature, hcc);
+                fnManager.deRegisterDeployedFunctionJobSpecId(signature);
                 MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DeployedFunctionUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DeployedFunctionUtils.java
new file mode 100644
index 0000000..80bbe7c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DeployedFunctionUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.utils;
+
+import java.rmi.RemoteException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.transactions.TxnId;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.lang.common.util.LangRecordParseUtil;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.runtime.functions.FunctionManager;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class DeployedFunctionUtils {
+
+    private static String makeFunctionBodyQuery(Function function, String 
functionBody) {
+        StringBuilder sb = new StringBuilder();
+        for (int iter1 = 0; iter1 < function.getArguments().size(); iter1++) {
+            if (iter1 == 0) {
+                sb.append("let ");
+            }
+            if (iter1 > 0) {
+                sb.append(",");
+            }
+            String param = function.getArguments().get(iter1);
+            // substring to remove '$' in parameter name
+            sb.append(param.substring(1)).append(" = 
`").append(BuiltinFunctions.GET_JOB_PARAMETER.getName())
+                    .append("`(\"").append(param).append("\") ");
+        }
+        sb.append(functionBody);
+        sb.append(";");
+        return sb.toString();
+    }
+
+    public static JobSpecification makeDeployedJobSpecification(Function 
function, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IStatementExecutor statementExecutor)
+            throws AlgebricksException, RemoteException {
+        
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, 
Boolean.TRUE.toString());
+        SqlppParserFactory parserFactory = new SqlppParserFactory();
+        String modified_function_body = makeFunctionBodyQuery(function, 
function.getFunctionBody());
+        IParser parser = parserFactory.createParser(modified_function_body);
+        List<Statement> stmts = parser.parse();
+        Query query = (Query) stmts.get(0);
+        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, 
query, null);
+    }
+
+    public static Map<byte[], byte[]> populateJobParameter(List<Expression> 
params, Function function, TxnId txnId)
+            throws HyracksDataException, CompilationException {
+        Map<byte[], byte[]> jobParams = new HashMap<>();
+        List<String> args = function.getArguments();
+        ArrayBackedValueStorage abvsKey = new ArrayBackedValueStorage();
+        ArrayBackedValueStorage abvsValue = new ArrayBackedValueStorage();
+        AMutableString paramName = new AMutableString("");
+        for (int iter1 = 0; iter1 < args.size(); iter1++) {
+            //Make argument name
+            paramName.setValue(args.get(iter1));
+            abvsKey.reset();
+            AStringSerializerDeserializer.INSTANCE.serialize(paramName, 
abvsKey.getDataOutput());
+            byte[] key = new byte[abvsKey.getLength()];
+            System.arraycopy(abvsKey.getByteArray(), 0, key, 0, 
abvsKey.getLength());
+
+            //Make argument value
+            abvsValue.reset();
+            LangRecordParseUtil.parseExpression(params.get(iter1), abvsValue);
+            byte[] value = new byte[abvsValue.getLength()];
+            System.arraycopy(abvsValue.getByteArray(), 
abvsValue.getStartOffset(), value, 0, abvsValue.getLength());
+            jobParams.put(key, value);
+        }
+        jobParams.put(JobEventListenerFactory.TRANSACTION_ID_PARAMETER_NAME, 
String.valueOf(txnId.getId()).getBytes());
+        return jobParams;
+    }
+
+    public static void dropDeployedFunction(FunctionManager fnManager, 
FunctionSignature signature,
+            IHyracksClientConnection hcc) throws Exception {
+        // the concurrent deletion is guarded by metadata locks
+        DeployedJobSpecId jobSpecId = 
fnManager.lookupDeployedFunctionJobSpecId(signature);
+        if (jobSpecId != null) {
+            fnManager.deRegisterDeployedFunctionJobSpecId(signature);
+            hcc.undeployJobSpec(jobSpecId);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf 
b/asterixdb/asterix-app/src/main/resources/cc.conf
index 0d9f54f..5851787 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -43,7 +43,7 @@
 address = 127.0.0.1
 app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
 heartbeat.period=2000
-heartbeat.max.misses=25
+heartbeat.max.misses=9999
 
 [common]
 log.level = INFO
diff --git a/asterixdb/asterix-app/src/main/resources/log4j2.xml 
b/asterixdb/asterix-app/src/main/resources/log4j2.xml
index 1debf82..649bf5e 100644
--- a/asterixdb/asterix-app/src/main/resources/log4j2.xml
+++ b/asterixdb/asterix-app/src/main/resources/log4j2.xml
@@ -27,5 +27,8 @@
       <AppenderRef ref="Console"/>
     </Root>
     <Logger name="org.apache.hyracks.control.nc.service" level="INFO"/>
+    <Logger name="org.apache.hyracks.algebricks" level="DEBUG">
+      <AppenderRef ref="ConsoleTest"/>
+    </Logger>
   </Loggers>
 </Configuration>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf 
b/asterixdb/asterix-app/src/test/resources/cc.conf
index fc95dd4..51e5ddf 100644
--- a/asterixdb/asterix-app/src/test/resources/cc.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc.conf
@@ -42,7 +42,7 @@
 [cc]
 address = 127.0.0.1
 app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
-heartbeat.period=2000
+heartbeat.period=20000000
 heartbeat.max.misses=25
 
 [common]
diff --git a/asterixdb/asterix-app/src/test/resources/log4j2-asterixdb-test.xml 
b/asterixdb/asterix-app/src/test/resources/log4j2-asterixdb-test.xml
index c6ecd7d..c531c04 100644
--- a/asterixdb/asterix-app/src/test/resources/log4j2-asterixdb-test.xml
+++ b/asterixdb/asterix-app/src/test/resources/log4j2-asterixdb-test.xml
@@ -41,5 +41,14 @@
     <Logger name="org.apache.asterix.test" level="INFO">
       <AppenderRef ref="ConsoleTest"/>
     </Logger>
+    <Logger name="org.apache.hyracks.algebricks" level="DEBUG">
+      <AppenderRef ref="ConsoleTest"/>
+    </Logger>
+    <!--<Logger name="org.apache.hyracks.control.cc" level="INFO">-->
+      <!--<AppenderRef ref="Console"/>-->
+    <!--</Logger>-->
+    <!--<Logger name="org.apache.hyracks.control.cc.executor" level="INFO">-->
+      <!--<AppenderRef ref="Console"/>-->
+    <!--</Logger>-->
   </Loggers>
 </Configuration>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
index 334dd52..c2549b6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
@@ -19,5 +19,20 @@
  !-->
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" 
QueryFileExtension=".sqlpp">
   <test-group name="failed">
+    <!--<test-case FilePath="user-defined-functions">-->
+      <!--<compilation-unit name="deployed_function_secondary_idx_lookup">-->
+        <!--<output-dir compare="Text">deployed_function</output-dir>-->
+      <!--</compilation-unit>-->
+    <!--</test-case>-->
+    <!--<test-case FilePath="user-defined-functions">-->
+      <!--<compilation-unit name="deployed_function_non_key_lookup">-->
+        <!--<output-dir compare="Text">deployed_function</output-dir>-->
+      <!--</compilation-unit>-->
+    <!--</test-case>-->
+    <!--<test-case FilePath="user-defined-functions">-->
+      <!--<compilation-unit name="deployed_function_primary_key_lookup">-->
+        <!--<output-dir compare="Text">deployed_function</output-dir>-->
+      <!--</compilation-unit>-->
+    <!--</test-case>-->
   </test-group>
 </test-suite>
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.1.ddl.sqlpp
new file mode 100644
index 0000000..28c57e5
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.1.ddl.sqlpp
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : integer,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+};
+
+create type tpch.OrderType as
+ closed {
+  o_orderkey : integer,
+  o_custkey : integer,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : integer,
+  o_comment : string
+};
+
+create type tpch.CustomerType as
+ closed {
+  c_custkey : integer,
+  c_name : string,
+  c_address : string,
+  c_nationkey : integer,
+  c_phone : string,
+  c_acctbal : double,
+  c_mktsegment : string,
+  c_comment : string
+};
+
+create type tpch.SupplierType as
+ closed {
+  s_suppkey : integer,
+  s_name : string,
+  s_address : string,
+  s_nationkey : integer,
+  s_phone : string,
+  s_acctbal : double,
+  s_comment : string
+};
+
+create type tpch.NationType as
+ closed {
+  n_nationkey : integer,
+  n_name : string,
+  n_regionkey : integer,
+  n_comment : string
+};
+
+create type tpch.RegionType as
+ closed {
+  r_regionkey : integer,
+  r_name : string,
+  r_comment : string
+};
+
+create type tpch.PartType as
+ closed {
+  p_partkey : integer,
+  p_name : string,
+  p_mfgr : string,
+  p_brand : string,
+  p_type : string,
+  p_size : integer,
+  p_container : string,
+  p_retailprice : double,
+  p_comment : string
+};
+
+create type tpch.PartSuppType as
+ closed {
+  ps_partkey : integer,
+  ps_suppkey : integer,
+  ps_availqty : integer,
+  ps_supplycost : double,
+  ps_comment : string
+};
+
+create type testType as open {
+    id: integer
+};
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create  dataset Orders(OrderType) primary key o_orderkey;
+
+create  dataset Supplier(SupplierType) primary key s_suppkey;
+//create  dataset Supplier(SupplierType) primary key s_name;
+
+create  dataset Region(RegionType) primary key r_regionkey;
+
+create  dataset Nation(NationType) primary key n_nationkey;
+
+create  dataset Part(PartType) primary key p_partkey;
+
+create  dataset Partsupp(PartSuppType) primary key ps_partkey,ps_suppkey;
+
+create  dataset Customer(CustomerType) primary key c_custkey;
+
+create dataset testDataset(testType) primary key id;
+
+load  dataset Supplier using localfs 
((`path`=`asterix_nc1://data/tpch0.001/supplier.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+create function primary_key(x) {
+  select * from Supplier n where n.s_suppkey = x
+};
+
+
+// not supporting yet
+//create function primary_key(x) {
+//    select spatial_distance(s.location, x) from testDataset s
+//};
+//
+//insert into testDataset([
+//{"id" : 1, "location" : point("1, 0")},
+//{"id" : 2, "location" : point("0, 0")},
+//{"id" : 3, "location" : point("-1, 0")},
+//{"id" : 4, "location" : point("0, -1")}
+//]);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.2.update.sqlpp
new file mode 100644
index 0000000..452a4b3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use tpch;
+deploy function primary_key@1;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.3.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.3.update.sqlpp
new file mode 100644
index 0000000..22d4632
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.3.update.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+//load  dataset LineItem using localfs 
((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Orders using localfs 
((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Supplier using localfs 
((`path`=`asterix_nc1://data/tpch0.001/supplier.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Region using localfs 
((`path`=`asterix_nc1://data/tpch0.001/region.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Nation using localfs 
((`path`=`asterix_nc1://data/tpch0.001/nation.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Part using localfs 
((`path`=`asterix_nc1://data/tpch0.001/part.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Partsupp using localfs 
((`path`=`asterix_nc1://data/tpch0.001/partsupp.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
+//load  dataset Customer using localfs 
((`path`=`asterix_nc1://data/tpch0.001/customer.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+//
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.4.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.4.query.sqlpp
new file mode 100644
index 0000000..92e218e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function/deployed_function.4.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+//select * from Supplier n where n.s_suppkey = 12;
+//invoke function primary_key("Supplier#000000001");
+//invoke function primary_key([{"id" : 1, "val" : "val1"}, {"id" : 0, "val" : 
"val0"}]);
+invoke function primary_key(1);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.1.ddl.sqlpp
new file mode 100644
index 0000000..ee3c4b8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.SupplierType as
+ closed {
+  s_suppkey : integer,
+  s_name : string,
+  s_address : string,
+  s_nationkey : integer,
+  s_phone : string,
+  s_acctbal : double,
+  s_comment : string
+};
+
+create  dataset Supplier(SupplierType) primary key s_suppkey;
+
+load  dataset Supplier using localfs 
((`path`=`asterix_nc1://data/tpch0.001/supplier.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+create function non_key(x) {
+  select * from Supplier n where n.s_name = x
+};
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.2.update.sqlpp
new file mode 100644
index 0000000..9ada21e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use tpch;
+deploy function non_key@1;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.3.query.sqlpp
new file mode 100644
index 0000000..532ff40
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.3.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+invoke function non_key("Supplier#000000001");
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.4.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.4.update.sqlpp
new file mode 100644
index 0000000..8a906bf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_non_key_lookup/deployed_function_non_key_lookup.4.update.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse tpch;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.1.ddl.sqlpp
new file mode 100644
index 0000000..14fbf98
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.1.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.SupplierType as
+ closed {
+  s_suppkey : integer,
+  s_name : string,
+  s_address : string,
+  s_nationkey : integer,
+  s_phone : string,
+  s_acctbal : double,
+  s_comment : string
+};
+
+create  dataset Supplier(SupplierType) primary key s_suppkey;
+
+load  dataset Supplier using localfs 
((`path`=`asterix_nc1://data/tpch0.001/supplier.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+create function primary_key(x) {
+  select * from Supplier n where n.s_suppkey = x
+};
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.2.update.sqlpp
new file mode 100644
index 0000000..452a4b3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use tpch;
+deploy function primary_key@1;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.3.query.sqlpp
new file mode 100644
index 0000000..a380a90
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+invoke function primary_key(1);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.4.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.4.update.sqlpp
new file mode 100644
index 0000000..8a906bf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_primary_key_lookup/deployed_function_primary_key_lookup.4.update.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse tpch;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.1.ddl.sqlpp
new file mode 100644
index 0000000..90ef82b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.SupplierType as
+ closed {
+  s_suppkey : integer,
+  s_name : string,
+  s_address : string,
+  s_nationkey : integer,
+  s_phone : string,
+  s_acctbal : double,
+  s_comment : string
+};
+
+create  dataset Supplier(SupplierType) primary key s_suppkey;
+create index loc_index on Supplier(s_name);
+
+load  dataset Supplier using localfs 
((`path`=`asterix_nc1://data/tpch0.001/supplier.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+create function primary_key(x) {
+  select * from Supplier n where n.s_name = x
+};
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.2.update.sqlpp
new file mode 100644
index 0000000..452a4b3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use tpch;
+deploy function primary_key@1;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.3.query.sqlpp
new file mode 100644
index 0000000..ff9b81b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+invoke function primary_key("Supplier#000000001");
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.4.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.4.update.sqlpp
new file mode 100644
index 0000000..8a906bf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/deployed_function_secondary_idx_lookup/deployed_function_secondary_idx_lookup.4.update.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse tpch;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.1.ddl.sqlpp
new file mode 100644
index 0000000..7c3f319
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.1.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type tpch.tweetusertype as open {
+  id : int64
+};
+
+create dataset tweetuser(tweetusertype) primary key id;
+
+create function name_lookup(x) {
+  select s from tweetuser s where s.cityName = x
+};
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.2.update.sqlpp
new file mode 100644
index 0000000..c4132a6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use tpch;
+load  dataset tweetuser using localfs 
((`path`=`asterix_nc1://data/twitter/sample.adm`),(`format`=`adm`));
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.3.query.sqlpp
new file mode 100644
index 0000000..372db2f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/user-defined-functions/type_inference/type_inference.3.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use tpch;
+select s, name_lookup(s.cityName) from tweetuser s;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/deployed_function/deployed_function.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/deployed_function/deployed_function.1.adm
new file mode 100644
index 0000000..05a6a9e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/user-defined-functions/deployed_function/deployed_function.1.adm
@@ -0,0 +1 @@
+{ "n": { "s_suppkey": 1, "s_name": "Supplier#000000001", "s_address": " N 
kD4on9OM Ipw3,gf0JBoQDd7tgrzrddZ", "s_nationkey": 17, "s_phone": 
"27-918-335-1736", "s_acctbal": 5755.94, "s_comment": "each slyly above the 
careful" } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 4dd6d97..8dce8a6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6477,6 +6477,21 @@
         <output-dir compare="Text">query-ASTERIXDB-1317</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="user-defined-functions">
+      <compilation-unit name="deployed_function_secondary_idx_lookup">
+        <output-dir compare="Text">deployed_function</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="user-defined-functions">
+      <compilation-unit name="deployed_function_non_key_lookup">
+        <output-dir compare="Text">deployed_function</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="user-defined-functions">
+      <compilation-unit name="deployed_function_primary_key_lookup">
+        <output-dir compare="Text">deployed_function</output-dir>
+      </compilation-unit>
+    </test-case>
     <!-- <test-case FilePath="user-defined-functions">
           <compilation-unit name="query-ASTERIXDB-1308-2">
               <output-dir compare="Text">query-ASTERIXDB-1308-2</output-dir>
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index cb792d9..b93a430 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -82,6 +82,8 @@
         COMPACT,
         EXTERNAL_DATASET_REFRESH,
         SUBSCRIBE_FEED,
+        DEPLOY,
+        INVOKE,
         EXTENSION,
     }
 }
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeployFunctionStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeployFunctionStatement.java
new file mode 100644
index 0000000..e85d4da
--- /dev/null
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeployFunctionStatement.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+
+public class DeployFunctionStatement implements Statement {
+
+    private final FunctionSignature signature;
+
+    public DeployFunctionStatement(FunctionSignature fnSig) {
+        this.signature = fnSig;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.DEPLOY;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws 
CompilationException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    public FunctionSignature getSignature() {
+        return signature;
+    }
+}
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InvokeFunctionStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InvokeFunctionStatement.java
new file mode 100644
index 0000000..7c7e13c
--- /dev/null
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InvokeFunctionStatement.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+
+public class InvokeFunctionStatement implements Statement {
+
+    private final CallExpr functionCallExpr;
+
+    public InvokeFunctionStatement(CallExpr functionCallExpr) {
+        this.functionCallExpr = functionCallExpr;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.INVOKE;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws 
CompilationException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.QUERY;
+    }
+
+    public CallExpr getFunctionCallExpr() {
+        return functionCallExpr;
+    }
+}
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
index 3571dcf..acaa070 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
@@ -191,7 +191,7 @@
         return dependencies;
     }
 
-    private static Function 
lookupUserDefinedFunctionDecl(MetadataTransactionContext mdTxnCtx,
+    public static Function 
lookupUserDefinedFunctionDecl(MetadataTransactionContext mdTxnCtx,
             FunctionSignature signature) throws AlgebricksException {
         if (signature.getNamespace() == null) {
             return null;
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
index c4b23ef..aab0f44 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.lang.common.util;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -71,7 +70,7 @@
     private LangRecordParseUtil() {
     }
 
-    private static void parseExpression(Expression expr, 
ArrayBackedValueStorage serialized)
+    public static void parseExpression(Expression expr, 
ArrayBackedValueStorage serialized)
             throws HyracksDataException, CompilationException {
         switch (expr.getKind()) {
             case LITERAL_EXPRESSION:
@@ -85,10 +84,9 @@
                 break;
             default:
                 throw new HyracksDataException(ErrorCode.ASTERIX, 
ErrorCode.PARSE_ERROR,
-                        NOT_ALLOWED_EXPRESSIONS_ERROR_MESSAGE,
-                        new Serializable[] { 
Expression.Kind.LITERAL_EXPRESSION.toString(),
-                                
Expression.Kind.RECORD_CONSTRUCTOR_EXPRESSION.toString(),
-                                
Expression.Kind.LIST_CONSTRUCTOR_EXPRESSION.toString() });
+                        NOT_ALLOWED_EXPRESSIONS_ERROR_MESSAGE, 
Expression.Kind.LITERAL_EXPRESSION.toString(),
+                        
Expression.Kind.RECORD_CONSTRUCTOR_EXPRESSION.toString(),
+                        
Expression.Kind.LIST_CONSTRUCTOR_EXPRESSION.toString());
         }
     }
 
@@ -160,7 +158,7 @@
         listBuilder.write(serialized.getDataOutput(), true);
     }
 
-    private static void parseLiteral(LiteralExpr objectExpr, 
ArrayBackedValueStorage serialized)
+    public static void parseLiteral(LiteralExpr objectExpr, 
ArrayBackedValueStorage serialized)
             throws HyracksDataException {
         Literal value = objectExpr.getValue();
         switch (value.getLiteralType()) {
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 7ba01ce..72d4389 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -72,6 +72,7 @@
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.statement.DeployFunctionStatement;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.ExternalDetailsDecl;
@@ -82,6 +83,7 @@
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.InvokeFunctionStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -750,6 +752,20 @@
     }
 
     @Override
+    public Void visit(DeployFunctionStatement dfs, Integer step) throws 
CompilationException {
+        // visit deployed function
+        out.print("This needs to be completed");
+        return null;
+    }
+
+    @Override
+    public Void visit(InvokeFunctionStatement dfs, Integer step) throws 
CompilationException {
+        // visit deployed function
+        out.print("This needs to be completed");
+        return null;
+    }
+
+    @Override
     public Void visit(CreateFeedStatement cfs, Integer step) throws 
CompilationException {
         out.print(skip(step) + "create " + FEED);
         out.print(generateFullName(cfs.getDataverseName(), cfs.getFeedName()));
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
index 117fa77..1b08456 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.statement.DeployFunctionStatement;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.FeedDropStatement;
@@ -42,6 +43,7 @@
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InvokeFunctionStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -176,6 +178,16 @@
     }
 
     @Override
+    public R visit(DeployFunctionStatement cfs, T arg) throws 
CompilationException {
+        return null;
+    }
+
+    @Override
+    public R visit(InvokeFunctionStatement cfs, T arg) throws 
CompilationException {
+        return null;
+    }
+
+    @Override
     public R visit(FunctionDropStatement del, T arg) throws 
CompilationException {
         return null;
     }
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
index cace925..55f54e8 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
@@ -51,6 +51,7 @@
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DeleteStatement;
+import org.apache.asterix.lang.common.statement.DeployFunctionStatement;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.FeedDropStatement;
@@ -59,6 +60,7 @@
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InvokeFunctionStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -173,6 +175,10 @@
 
     R visit(FunctionDropStatement del, T arg) throws CompilationException;
 
+    R visit(DeployFunctionStatement dfs, T arg) throws CompilationException;
+
+    R visit(InvokeFunctionStatement dfs, T arg) throws CompilationException;
+
     R visit(CompactStatement del, T arg) throws CompilationException;
 
 }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj 
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 9af114b..e71d88b 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -119,6 +119,8 @@
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
+import org.apache.asterix.lang.common.statement.DeployFunctionStatement;
+import org.apache.asterix.lang.common.statement.InvokeFunctionStatement;
 import org.apache.asterix.lang.common.statement.DeleteStatement;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
@@ -386,6 +388,8 @@
     | stmt = UpdateStatement()
     | stmt = UpsertStatement()
     | stmt = ConnectionStatement()
+    | stmt = DeployStatement()
+    | stmt = InvokeStatement()
     | stmt = CompactStatement()
     | stmt = ExplainStatement()
     | stmt = Query(false)
@@ -1184,6 +1188,36 @@
     {
       return stmt;
     }
+}
+
+Statement DeployStatement() throws ParseException:
+{
+  DeployFunctionStatement  dfs = null;
+  FunctionSignature fnSig;
+}
+{
+  (
+    <DEPLOY> <FUNCTION> fnSig = FunctionSignature()
+  )
+  {
+    dfs = new DeployFunctionStatement(fnSig);
+    return dfs;
+  }
+}
+
+Statement InvokeStatement() throws ParseException:
+{
+  Expression funcCallExpr = null;
+  InvokeFunctionStatement ifs = null;
+}
+{
+  (
+    <INVOKE> <FUNCTION> funcCallExpr = FunctionCallExpr()
+  )
+  {
+    ifs = new InvokeFunctionStatement((CallExpr)funcCallExpr);
+    return ifs;
+  }
 }
 
 Statement ConnectionStatement() throws ParseException:
@@ -3302,6 +3336,8 @@
   | <WHERE : "where">
   | <WITH : "with">
   | <WRITE : "write">
+  | <DEPLOY : "deploy">
+  | <INVOKE : "invoke">
 }
 
 <DEFAULT,IN_DBL_BRACE>
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 6eeb6ab..d4df2eb 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -27,95 +27,7 @@
 import org.apache.asterix.common.functions.FunctionConstants;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ABinaryTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ABooleanTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ACircleTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ADateTimeTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ADateTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ADayTimeDurationTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ADoubleTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ADurationTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AFloatTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AGeometryTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AInt16TypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AInt32TypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AInt64TypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AInt8TypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AIntervalTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ALineTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.APoint3DTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.APointTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.APolygonTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ARectangleTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AStringTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ATemporalInstanceTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ATimeTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AUUIDTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AYearMonthDurationTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.AnyTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.BooleanFunctionTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.BooleanOnlyTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.BooleanOrMissingTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.CastTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.CastTypeLaxComputer;
-import 
org.apache.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
-import org.apache.asterix.om.typecomputer.impl.CollectionMemberResultType;
-import 
org.apache.asterix.om.typecomputer.impl.CollectionToSequenceTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ConcatNonNullTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
-import org.apache.asterix.om.typecomputer.impl.FieldAccessByNameResultType;
-import org.apache.asterix.om.typecomputer.impl.FieldAccessNestedResultType;
-import 
org.apache.asterix.om.typecomputer.impl.FullTextContainsResultTypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.GetOverlappingInvervalTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.IfNanOrInfTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.IfMissingOrNullTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.IfMissingTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.IfNullTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.NonTaggedGetItemResultType;
-import org.apache.asterix.om.typecomputer.impl.NotUnknownTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.NullableDoubleTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.NumericAddSubMulDivTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.NumericAggTypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.NumericDoubleOutputFunctionTypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.NumericInt8OutputFunctionTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.NumericRound2TypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.NumericUnaryFunctionTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.OpenARecordTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.OpenRecordConstructorResultType;
-import 
org.apache.asterix.om.typecomputer.impl.OrderedListConstructorTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.OrderedListOfAInt32TypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.OrderedListOfAIntervalTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.OrderedListOfAPointTypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.OrderedListOfAStringTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.OrderedListOfAnyTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.PropagateTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.RecordAddFieldsTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.RecordMergeTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType;
-import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer;
-import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.StringStringTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.StringToInt64ListTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.StringToStringListTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.SubsetCollectionTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.SubstringTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.SwitchCaseComputer;
-import org.apache.asterix.om.typecomputer.impl.ToArrayTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ToBigIntTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ToDoubleTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ToNumberTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ToObjectTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.UnaryBinaryInt64TypeComputer;
-import org.apache.asterix.om.typecomputer.impl.UnaryMinusTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.UnaryStringInt64TypeComputer;
-import 
org.apache.asterix.om.typecomputer.impl.UnorderedListConstructorTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.*;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionManager.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionManager.java
index e1657ff..66a6d8c 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionManager.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionManager.java
@@ -18,8 +18,10 @@
  */
 package org.apache.asterix.om.functions;
 
+import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 
 /**
  * A registry of {@link IFunctionDescriptor} and {@link IFunctionTypeInferer}
@@ -30,4 +32,10 @@
     IFunctionDescriptor lookupFunction(FunctionIdentifier fid) throws 
AlgebricksException;
 
     IFunctionTypeInferer lookupFunctionTypeInferer(FunctionIdentifier fid);
+
+    DeployedJobSpecId lookupDeployedFunctionJobSpecId(FunctionSignature fnsig);
+
+    void registerDeployedFunctionJobSpecId(FunctionSignature fnsig, 
DeployedJobSpecId deployedJobSpecId);
+
+    void deRegisterDeployedFunctionJobSpecId(FunctionSignature fnsig);
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
index 17f7a96..aaa51a1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
@@ -35,11 +35,13 @@
 
 public class GetJobParameterByNameDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
+
     public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new GetJobParameterByNameDescriptor();
         }
+
     };
 
     @Override
@@ -49,19 +51,20 @@
 
             @Override
             public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext 
ctx) throws HyracksDataException {
-                return new AbstractUnaryStringStringEval(ctx, args[0],
-                        GetJobParameterByNameDescriptor.this.getIdentifier()) {
+                return new AbstractUnaryStringStringEval(ctx, args[0], 
getIdentifier()) {
                     private byte[] result;
 
                     @Override
                     protected void process(UTF8StringPointable inputString, 
IPointable resultPointable)
                             throws IOException {
-                        result = 
ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
-                                inputString.getLength());
+                        if (result == null) {
+                            result = 
ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+                                    inputString.getLength());
+                        }
                     }
 
                     @Override
-                    void writeResult(IPointable resultPointable) throws 
IOException {
+                    void writeResult(IPointable resultPointable) {
                         resultPointable.set(result, 0, result.length);
                     }
                 };
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionManager.java
index d5a6559..ede699a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionManager.java
@@ -21,7 +21,9 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionManager;
@@ -29,6 +31,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 
 /**
  * Default implementation of {@link IFunctionManager}.
@@ -39,8 +42,10 @@
 
     private final Map<FunctionIdentifier, IFunctionTypeInferer> typeInferers;
 
+    private final Map<FunctionSignature, DeployedJobSpecId> 
deployedFnJobSpecIdMap;
+
     public FunctionManager(FunctionCollection functionCollection) {
-        Map<Pair<FunctionIdentifier, Integer>, IFunctionDescriptorFactory> 
functionsMap = new HashMap<>();
+        Map<Pair<FunctionIdentifier, Integer>, IFunctionDescriptorFactory> 
functionsMap = new ConcurrentHashMap<>();
         Map<FunctionIdentifier, IFunctionTypeInferer> typeInferersMap = new 
HashMap<>();
 
         for (IFunctionDescriptorFactory descriptorFactory : 
functionCollection.getFunctionDescriptorFactories()) {
@@ -54,6 +59,7 @@
 
         this.functions = functionsMap;
         this.typeInferers = typeInferersMap;
+        this.deployedFnJobSpecIdMap = new HashMap<>();
     }
 
     @Override
@@ -70,4 +76,20 @@
     public IFunctionTypeInferer lookupFunctionTypeInferer(FunctionIdentifier 
fid) {
         return typeInferers.get(fid);
     }
+
+    @Override
+    public DeployedJobSpecId lookupDeployedFunctionJobSpecId(FunctionSignature 
fnsig) {
+        return deployedFnJobSpecIdMap.get(fnsig);
+    }
+
+    @Override
+    public void registerDeployedFunctionJobSpecId(FunctionSignature fnsig, 
DeployedJobSpecId deployedJobSpecId) {
+        deployedFnJobSpecIdMap.put(fnsig, deployedJobSpecId);
+    }
+
+    @Override
+    public void deRegisterDeployedFunctionJobSpecId(FunctionSignature fnsig) {
+        deployedFnJobSpecIdMap.remove(fnsig);
+    }
+
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 6faffc7..59c1bc8 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -40,7 +40,7 @@
     private final boolean transactionalWrite;
 
     //To enable new Asterix TxnId for separate deployed job spec invocations
-    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = 
"TxnIdParameter".getBytes();
+    public static final byte[] TRANSACTION_ID_PARAMETER_NAME = 
"TxnIdParameter".getBytes();
 
     public JobEventListenerFactory(TxnId txnId, boolean transactionalWrite) {
         this.txnId = txnId;
@@ -62,7 +62,7 @@
         String AsterixTransactionIdString = new String(jobParameterByteStore
                 .getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0, 
TRANSACTION_ID_PARAMETER_NAME.length));
         if (AsterixTransactionIdString.length() > 0) {
-            this.txnId = new 
TxnId(Integer.parseInt(AsterixTransactionIdString));
+            this.txnId = new TxnId(Long.valueOf(AsterixTransactionIdString));
         }
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractDeployedFunctionCallFromSelectRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractDeployedFunctionCallFromSelectRule.java
new file mode 100644
index 0000000..66ed571
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractDeployedFunctionCallFromSelectRule.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Factors out function expressions from each comparison function or 
similarity function in join condition by
+ * assigning them to a variables, and replacing the function expressions with 
references to those variables.
+ * Examples:
+ * Plan with function expressions in comparison or similarity condition of 
join expression.
+ * Generates one assign operator per extracted function expression.
+ *
+ * <pre>
+ * Before plan:
+ *
+ *   join ( eq( funcX($$1), funcX($$2) ) )
+ *
+ * After plan:
+ *
+ *   join (eq($$3,$$4))
+ *   assign [$$4] <- [funcY($$2)]
+ *   assign [$$3] <- [funcX($$1)]
+ * </pre>
+ */
+public class ExtractDeployedFunctionCallFromSelectRule implements 
IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+
+        if (op.getOperatorTag() != LogicalOperatorTag.SELECT) {
+            return false;
+        }
+        SelectOperator joinOp = (SelectOperator) op;
+        ILogicalExpression expr = joinOp.getCondition().getValue();
+
+        return assignFunctionExpressions(joinOp, expr, context);
+
+    }
+
+    private boolean assignFunctionExpressions(AbstractLogicalOperator joinOp, 
ILogicalExpression expr,
+            IOptimizationContext context) throws AlgebricksException {
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) 
expr;
+        FunctionIdentifier fi = fexp.getFunctionIdentifier();
+
+        boolean modified = false;
+        if (fi.equals(AlgebricksBuiltinFunctions.AND) || 
fi.equals(AlgebricksBuiltinFunctions.OR)
+                || processArgumentsToFunction(fi)) {
+            for (Mutable<ILogicalExpression> a : fexp.getArguments()) {
+                if (assignFunctionExpressions(joinOp, a.getValue(), context)) {
+                    modified = true;
+                }
+            }
+            return modified;
+        } else if (AlgebricksBuiltinFunctions.isComparisonFunction(fi) || 
isComparisonFunction(fi)) {
+            for (Mutable<ILogicalExpression> exprRef : fexp.getArguments()) {
+                if (exprRef.getValue().getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
+                    LogicalVariable newVar = context.newVar();
+                    AssignOperator newAssign = new AssignOperator(newVar,
+                            new 
MutableObject<ILogicalExpression>(exprRef.getValue().cloneExpression()));
+                    newAssign.setExecutionMode(joinOp.getExecutionMode());
+
+                    // Place assign below joinOp.
+                    List<LogicalVariable> used = new 
ArrayList<LogicalVariable>();
+                    VariableUtilities.getUsedVariables(newAssign, used);
+
+                    Mutable<ILogicalOperator> leftBranchRef = 
joinOp.getInputs().get(0);
+                    ILogicalOperator leftBranch = leftBranchRef.getValue();
+                    List<LogicalVariable> leftBranchVariables = new 
ArrayList<LogicalVariable>();
+                    VariableUtilities.getLiveVariables(leftBranch, 
leftBranchVariables);
+                    if (leftBranchVariables.containsAll(used)) {
+                        // place assign on left branch
+                        if (used.size() == 0) {
+                            // if used variable is empty, push the function 
all the way down to the bottom
+                            while (!(leftBranch instanceof 
EmptyTupleSourceOperator)) {
+                                Mutable<ILogicalOperator> newParent = 
leftBranchRef.getValue().getInputs().get(0);
+                                
newAssign.setExecutionMode(newParent.getValue().getExecutionMode());
+                                leftBranchRef = newParent;
+                                leftBranch = leftBranchRef.getValue();
+                            }
+                        }
+                        newAssign.getInputs().add(new 
MutableObject<ILogicalOperator>(leftBranch));
+                        leftBranchRef.setValue(newAssign);
+                        modified = true;
+                    } else {
+                        Mutable<ILogicalOperator> rightBranchRef = 
joinOp.getInputs().get(1);
+                        ILogicalOperator rightBranch = 
rightBranchRef.getValue();
+                        List<LogicalVariable> rightBranchVariables = new 
ArrayList<LogicalVariable>();
+                        VariableUtilities.getLiveVariables(rightBranch, 
rightBranchVariables);
+                        if (rightBranchVariables.containsAll(used)) {
+                            // place assign on right branch
+                            newAssign.getInputs().add(new 
MutableObject<ILogicalOperator>(rightBranch));
+                            rightBranchRef.setValue(newAssign);
+                            modified = true;
+                        }
+                    }
+
+                    if (modified) {
+                        // Replace original expr with variable reference.
+                        exprRef.setValue(new 
VariableReferenceExpression(newVar));
+                        
context.computeAndSetTypeEnvironmentForOperator(newAssign);
+                        
context.computeAndSetTypeEnvironmentForOperator(joinOp);
+                    }
+                }
+            }
+            return modified;
+        } else {
+            return false;
+        }
+    }
+
+    protected boolean processArgumentsToFunction(FunctionIdentifier fi) {
+        return false;
+    }
+
+    protected boolean isComparisonFunction(FunctionIdentifier fi) {
+        return false;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 06af2d8..cba1560 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -195,6 +195,7 @@
 
     private void startRunnableActivityClusters() throws HyracksException {
         Set<TaskCluster> taskClusterRoots = new HashSet<>();
+        LOGGER.log(Level.INFO, "Prepare to find runnable task cluster root.");
         findRunnableTaskClusterRoots(taskClusterRoots,
                 
jobRun.getActivityClusterGraph().getActivityClusterMap().values());
         if (LOGGER.isInfoEnabled()) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2611
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3c5a2f4c458ea7e19632d9fb9b5cd77b4e64a71f
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to