abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2795

Change subject: [NO ISSUE] Allow MetadataProvider config to store non String 
values
......................................................................

[NO ISSUE] Allow MetadataProvider config to store non String values

Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
M 
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
21 files changed, 91 insertions(+), 62 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/95/2795/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
index 503a631..d7e05b3 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
@@ -60,7 +60,7 @@
     }
 
     public static IAObject getSimThreshold(MetadataProvider metadata, String 
simFuncName) {
-        String simThresholValue = 
metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+        String simThresholValue = 
metadata.getProperty(SIM_THRESHOLD_PROP_NAME);
         IAObject ret = null;
         if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
             if (simThresholValue != null) {
@@ -103,7 +103,7 @@
 
     public static float getSimThreshold(MetadataProvider metadata) {
         float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD;
-        String simThresholValue = 
metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+        String simThresholValue = 
metadata.getProperty(SIM_THRESHOLD_PROP_NAME);
         if (simThresholValue != null) {
             simThreshold = Float.parseFloat(simThresholValue);
         }
@@ -112,7 +112,7 @@
 
     // TODO: The default function depend on the input types.
     public static String getSimFunction(MetadataProvider metadata) {
-        String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME);
+        String simFunction = metadata.getProperty(SIM_FUNCTION_PROP_NAME);
         if (simFunction == null) {
             simFunction = DEFAULT_SIM_FUNCTION;
         }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index 728aef6..c925b55 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -978,9 +978,9 @@
         }
 
         // Gets all variables from the right (inner) branch.
-        VariableUtilities.getLiveVariables((ILogicalOperator) 
subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp);
+        VariableUtilities.getLiveVariables(subTree.getRootRef().getValue(), 
liveVarsInSubTreeRootOp);
         // Gets the used variables from the SELECT or JOIN operator.
-        VariableUtilities.getUsedVariables((ILogicalOperator) 
topOpRef.getValue(), usedVarsInTopOp);
+        VariableUtilities.getUsedVariables(topOpRef.getValue(), 
usedVarsInTopOp);
         // Excludes the variables in the condition from the outer branch - in 
join case.
         for (Iterator<LogicalVariable> iterator = usedVarsInTopOp.iterator(); 
iterator.hasNext();) {
             LogicalVariable v = iterator.next();
@@ -1066,7 +1066,7 @@
         if (afterTopOpRefs != null) {
             for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) {
                 varsTmpSet.clear();
-                OperatorPropertiesUtil.getFreeVariablesInOp((ILogicalOperator) 
afterTopOpRef.getValue(), varsTmpSet);
+                
OperatorPropertiesUtil.getFreeVariablesInOp(afterTopOpRef.getValue(), 
varsTmpSet);
                 copyVarsToAnotherList(varsTmpSet, usedVarsAfterTopOp);
             }
         }
@@ -1210,7 +1210,7 @@
         // For the index-nested-loop join case,
         // we propagate all variables that come from the outer relation and 
are used after join operator.
         // Adds the variables that are both live after JOIN and used after the 
JOIN operator.
-        VariableUtilities.getLiveVariables((ILogicalOperator) 
topOpRef.getValue(), liveVarsAfterTopOp);
+        VariableUtilities.getLiveVariables(topOpRef.getValue(), 
liveVarsAfterTopOp);
         for (LogicalVariable v : usedVarsAfterTopOp) {
             if (!liveVarsAfterTopOp.contains(v) || 
findVarInTripleVarList(unionVarMap, v, false)) {
                 continue;
@@ -1223,8 +1223,7 @@
         // Replaces the original variables in the operators after the SELECT 
or JOIN operator to satisfy SSA.
         if (afterTopOpRefs != null) {
             for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) {
-                VariableUtilities.substituteVariables((ILogicalOperator) 
afterTopOpRef.getValue(),
-                        origVarToOutputVarMap, context);
+                
VariableUtilities.substituteVariables(afterTopOpRef.getValue(), 
origVarToOutputVarMap, context);
             }
         }
 
@@ -1808,7 +1807,7 @@
         List<LogicalVariable> dataScanRecordVars = new ArrayList<>();
 
         // Collects the used variables in the given select (join) operator.
-        VariableUtilities.getUsedVariables((ILogicalOperator) 
topRef.getValue(), usedVarsInSelJoinOpTemp);
+        VariableUtilities.getUsedVariables(topRef.getValue(), 
usedVarsInSelJoinOpTemp);
 
         // Removes the duplicated variables that are used in the select (join) 
operator
         // in case where the variable is used multiple times in the operator's 
expression.
@@ -1841,10 +1840,8 @@
         List<LogicalVariable> liveVarsInSubTreeRootOp = new ArrayList<>();
         List<LogicalVariable> producedVarsInSubTreeRootOp = new ArrayList<>();
 
-        VariableUtilities.getLiveVariables((ILogicalOperator) 
indexSubTree.getRootRef().getValue(),
-                liveVarsInSubTreeRootOp);
-        VariableUtilities.getProducedVariables((ILogicalOperator) 
indexSubTree.getRootRef().getValue(),
-                producedVarsInSubTreeRootOp);
+        
VariableUtilities.getLiveVariables(indexSubTree.getRootRef().getValue(), 
liveVarsInSubTreeRootOp);
+        
VariableUtilities.getProducedVariables(indexSubTree.getRootRef().getValue(), 
producedVarsInSubTreeRootOp);
 
         copyVarsToAnotherList(liveVarsInSubTreeRootOp, liveVarsAfterSelJoinOp);
         copyVarsToAnotherList(producedVarsInSubTreeRootOp, 
liveVarsAfterSelJoinOp);
@@ -2039,10 +2036,8 @@
         for (Mutable<ILogicalOperator> afterSelectRef : afterSelectOpRefs) {
             usedVarsAfterSelectOrJoinOp.clear();
             producedVarsAfterSelectOrJoinOp.clear();
-            VariableUtilities.getUsedVariables((ILogicalOperator) 
afterSelectRef.getValue(),
-                    usedVarsAfterSelectOrJoinOp);
-            VariableUtilities.getProducedVariables((ILogicalOperator) 
afterSelectRef.getValue(),
-                    producedVarsAfterSelectOrJoinOp);
+            VariableUtilities.getUsedVariables(afterSelectRef.getValue(), 
usedVarsAfterSelectOrJoinOp);
+            VariableUtilities.getProducedVariables(afterSelectRef.getValue(), 
producedVarsAfterSelectOrJoinOp);
             // Checks whether COUNT exists in the given plan since we can 
substitute record variable
             // with the PK variable as an optimization because COUNT(record) 
is equal to COUNT(PK).
             // For this case only, we can replace the record variable with the 
PK variable.
@@ -2051,14 +2046,13 @@
                 aggOp = (AggregateOperator) afterSelectRefOp;
                 condExprs = aggOp.getExpressions();
                 for (int i = 0; i < condExprs.size(); i++) {
-                    condExpr = (ILogicalExpression) 
condExprs.get(i).getValue();
+                    condExpr = condExprs.get(i).getValue();
                     if (condExpr.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
                         condExprFnCall = (AbstractFunctionCallExpression) 
condExpr;
                         if (condExprFnCall.getFunctionIdentifier() == 
BuiltinFunctions.COUNT) {
                             // COUNT found. count on the record ($$0) can be 
replaced as the PK variable.
                             countAggFunctionIsUsedInThePlan = true;
-                            
VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(),
-                                    usedVarsInCount);
+                            
VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsInCount);
                             break;
                         }
                     }
@@ -2187,7 +2181,7 @@
         }
 
         if (matchedFuncExprs.size() == 1) {
-            condExpr = (ILogicalExpression) optFuncExpr.getFuncExpr();
+            condExpr = optFuncExpr.getFuncExpr();
             condExprFnCall = (AbstractFunctionCallExpression) condExpr;
             for (int i = 0; i < condExprFnCall.getArguments().size(); i++) {
                 Mutable<ILogicalExpression> expr = 
condExprFnCall.getArguments().get(i);
@@ -2372,7 +2366,7 @@
                 AssignOperator assignOp = (AssignOperator) assignUnnestOp;
                 condExprs = assignOp.getExpressions();
                 for (int i = 0; i < condExprs.size(); i++) {
-                    condExpr = (ILogicalExpression) 
condExprs.get(i).getValue();
+                    condExpr = condExprs.get(i).getValue();
                     if (condExpr.getExpressionTag() == 
LogicalExpressionTag.CONSTANT
                             && 
!targetVars.contains(assignOp.getVariables().get(i))) {
                         targetVars.add(assignOp.getVariables().get(i));
@@ -2400,13 +2394,13 @@
             case LEFT_OUTER_UNNEST_MAP:
                 return topOp;
             case UNIONALL:
-                dataSourceOp = (ILogicalOperator) 
dataSourceOp.getInputs().get(0).getValue();
+                dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
                 // Index-only plan case:
                 // The order of operators: 7 unionall <- 6 select <- 5 assign?
                 // <- 4 unnest-map (PIdx) <- 3 split <- 2 unnest-map (SIdx) <- 
...
                 // We do this to skip the primary index-search since we are 
looking for a secondary index-search here.
                 do {
-                    dataSourceOp = (ILogicalOperator) 
dataSourceOp.getInputs().get(0).getValue();
+                    dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
                 } while (dataSourceOp.getOperatorTag() != 
LogicalOperatorTag.SPLIT && dataSourceOp.hasInputs());
 
                 if (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT) 
{
@@ -2415,7 +2409,7 @@
                 }
 
                 do {
-                    dataSourceOp = (ILogicalOperator) 
dataSourceOp.getInputs().get(0).getValue();
+                    dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
                 } while (dataSourceOp.getOperatorTag() != 
LogicalOperatorTag.UNNEST_MAP
                         && dataSourceOp.getOperatorTag() != 
LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP
                         && dataSourceOp.hasInputs());
@@ -2532,9 +2526,10 @@
      *         false otherwise.
      */
     public static boolean getNoIndexOnlyOption(IOptimizationContext context) {
-        Map<String, String> config = context.getMetadataProvider().getConfig();
+        Map<String, Object> config = context.getMetadataProvider().getConfig();
         if 
(config.containsKey(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION))
 {
-            return 
Boolean.parseBoolean(config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION));
+            return Boolean
+                    .parseBoolean((String) 
config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION));
         }
         return 
AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION_DEFAULT_VALUE;
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 1a88170..5b961f0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -217,7 +217,7 @@
             generateLogicalPlan(plan, output.config().getPlanFormat());
         }
         CompilerProperties compilerProperties = 
metadataProvider.getApplicationContext().getCompilerProperties();
-        Map<String, String> querySpecificConfig = 
validateConfig(metadataProvider.getConfig(), sourceLoc);
+        Map<String, Object> querySpecificConfig = 
validateConfig(metadataProvider.getConfig(), sourceLoc);
         final PhysicalOptimizationConfig physOptConf =
                 getPhysicalOptimizationConfig(compilerProperties, 
querySpecificConfig, sourceLoc);
 
@@ -235,7 +235,7 @@
         builder.setMissableTypeComputer(MissableTypeComputer.INSTANCE);
         builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE);
 
-        int parallelism = 
getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
+        int parallelism = getParallelism((String) 
querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
                 compilerProperties.getParallelism());
         AlgebricksAbsolutePartitionConstraint computationLocations =
                 chooseLocations(clusterInfoCollector, parallelism, 
metadataProvider.getClusterLocations());
@@ -308,19 +308,19 @@
     }
 
     protected PhysicalOptimizationConfig 
getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
-            Map<String, String> querySpecificConfig, SourceLocation sourceLoc) 
throws AlgebricksException {
+            Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) 
throws AlgebricksException {
         int frameSize = compilerProperties.getFrameSize();
         int sortFrameLimit = 
getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
-                
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
+                (String) 
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
                 compilerProperties.getSortMemorySize(), frameSize, 
MIN_FRAME_LIMIT_FOR_SORT, sourceLoc);
         int groupFrameLimit = 
getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-                
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
+                (String) 
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
                 compilerProperties.getGroupMemorySize(), frameSize, 
MIN_FRAME_LIMIT_FOR_GROUP_BY, sourceLoc);
         int joinFrameLimit = 
getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
-                
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
+                (String) 
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
                 compilerProperties.getJoinMemorySize(), frameSize, 
MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc);
         int textSearchFrameLimit = 
getFrameLimit(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY,
-                
querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY),
+                (String) 
querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY),
                 compilerProperties.getTextSearchMemorySize(), frameSize, 
MIN_FRAME_LIMIT_FOR_TEXTSEARCH, sourceLoc);
         final PhysicalOptimizationConfig physOptConf = 
OptimizationConfUtil.getPhysicalOptimizationConfig();
         physOptConf.setFrameSize(frameSize);
@@ -482,7 +482,7 @@
     }
 
     // Validates if the query contains unsupported query parameters.
-    private static Map<String, String> validateConfig(Map<String, String> 
config, SourceLocation sourceLoc)
+    private static Map<String, Object> validateConfig(Map<String, Object> 
config, SourceLocation sourceLoc)
             throws AlgebricksException {
         for (String parameterName : config.keySet()) {
             if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 99ed42d..d4a52fb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -82,7 +82,8 @@
         // Note: The current implementation of the wait for completion flag is 
problematic due to locking issues:
         // Locks obtained during the start of the feed are not released, and 
so, the feed can't be stopped
         // and also, read locks over dataverses, datasets, etc, are never 
released.
-        boolean wait = 
Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
+        boolean wait =
+                Boolean.parseBoolean((String) 
metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
         if (wait) {
             IActiveEntityEventSubscriber stoppedSubscriber =
                     new WaitForStateSubscriber(this, 
EnumSet.of(ActivityState.STOPPED));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index e9087c2..bb80406 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -83,7 +83,7 @@
         String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
         MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
         DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
-        String policyName = 
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+        String policyName = (String) 
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
         FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, 
policyName);
         if (policy == null) {
             policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
@@ -93,7 +93,7 @@
             }
         }
         ArrayList<LogicalVariable> feedDataScanOutputVariables = new 
ArrayList<>();
-        String csLocations = 
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+        String csLocations = (String) 
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
         List<LogicalVariable> pkVars = new ArrayList<>();
         FeedDataSource ds = createFeedDataSource(asid, targetDataset, 
sourceFeedName, subscriptionLocation,
                 metadataProvider, policy, outputType, csLocations, 
unnest.getVariable(), context, pkVars);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index c1ccda7..b07a30e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -45,6 +45,7 @@
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
@@ -684,6 +685,9 @@
             int[] keyIndexes, List<Integer> keyIndicators, 
StorageComponentProvider storageComponentProvider,
             IFrameOperationCallbackFactory frameOpCallbackFactory, boolean 
hasSecondaries) throws Exception {
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        MetadataProvider mdProvider = new MetadataProvider(
+                (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(),
+                MetadataBuiltinEntities.DEFAULT_DATAVERSE);
         
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, 
Map<String, String>> mergePolicy =
                 DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -700,8 +704,9 @@
                 new LSMPrimaryUpsertOperatorNodePushable(ctx, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
                         indexHelperFactory, 
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                         recordDescProvider.getInputRecordDescriptor(new 
ActivityId(new OperatorDescriptorId(0), 0), 0),
-                        modificationCallbackFactory, searchCallbackFactory, 
keyIndexes.length, recordType, -1,
-                        frameOpCallbackFactory == null ? 
dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+                        modificationCallbackFactory, searchCallbackFactory,
+                        keyIndexes.length, recordType, -1, 
frameOpCallbackFactory == null
+                                ? 
dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory,
                         MissingWriterFactory.INSTANCE, hasSecondaries);
         RecordDescriptor upsertOutRecDesc = 
getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, 
metaType);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 830307b..e4e300a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -127,7 +127,7 @@
             MetadataProvider metadataProvider = mock(MetadataProvider.class);
 
             @SuppressWarnings("unchecked")
-            Map<String, String> config = mock(Map.class);
+            Map<String, Object> config = mock(Map.class);
             
when(metadataProvider.getDefaultDataverseName()).thenReturn(dvName);
             when(metadataProvider.getConfig()).thenReturn(config);
             
when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true");
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 8f28752..1d7ad2d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -50,5 +50,10 @@
         public void close() throws IOException {
             // No Op
         }
+
+        @Override
+        public void fail(Throwable th) {
+            // No Op
+        }
     }
 }
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
index 1f564a5..6752f77 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
@@ -102,7 +102,7 @@
         if (expression == null) {
             return functionDecls;
         }
-        String value = 
metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS);
+        String value = (String) 
metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS);
         boolean includePrivateFunctions = (value != null) ? 
Boolean.valueOf(value.toLowerCase()) : false;
         Set<CallExpr> functionCalls = 
functionCollector.getFunctionCalls(expression);
         for (CallExpr functionCall : functionCalls) {
diff --git 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
index 09a9f90..ecd8c8e 100644
--- 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
+++ 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
@@ -168,7 +168,7 @@
     }
 
     protected void inlineWithExpressions() throws CompilationException {
-        String inlineWith = metadataProvider.getConfig().get(INLINE_WITH);
+        String inlineWith = (String) 
metadataProvider.getConfig().get(INLINE_WITH);
         if (inlineWith != null && 
inlineWith.equalsIgnoreCase(NOT_INLINE_WITH)) {
             return;
         }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7f8d31d..38fd2f28 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -148,7 +148,7 @@
     private final StorageProperties storageProperties;
     private final IFunctionManager functionManager;
     private final LockList locks;
-    private final Map<String, String> config;
+    private final Map<String, Object> config;
 
     private Dataverse defaultDataverse;
     private MetadataTransactionContext mdTxnCtx;
@@ -173,8 +173,13 @@
         config = new HashMap<>();
     }
 
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
+    @SuppressWarnings("unchecked")
+    public <T> T getProperty(String name) {
+        return (T) config.get(name);
+    }
+
+    public void setProperty(String name, Object value) {
+        config.put(name, value);
     }
 
     public void disableBlockingOperator() {
@@ -186,7 +191,7 @@
     }
 
     @Override
-    public Map<String, String> getConfig() {
+    public Map<String, Object> getConfig() {
         return config;
     }
 
@@ -296,7 +301,7 @@
      */
     public ARecordType findOutputRecordType() throws AlgebricksException {
         return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, 
getDefaultDataverseName(),
-                getPropertyValue("output-record-type"));
+                getProperty("output-record-type"));
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws 
AlgebricksException {
@@ -1545,6 +1550,7 @@
         }
     }
 
+    @Override
     public AsterixTupleFilterFactory 
createTupleFilterFactory(IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, 
JobGenContext context)
             throws AlgebricksException {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index d0a22b7..8471d45 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -682,7 +682,7 @@
                 datasetPartitions, isSink);
     }
 
-    public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+    public IFrameOperationCallbackFactory 
getFrameOpCallbackFactory(MetadataProvider mdProvider) {
         return NoOpFrameOperationCallbackFactory.INSTANCE;
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6d81145..bbdfadf 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -447,7 +447,7 @@
         RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, 
outputTypeTraits);
         op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, 
fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, 
searchCallbackFactory,
-                dataset.getFrameOpCallbackFactory(), numKeys, itemType, 
fieldIdx, hasSecondaries);
+                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, 
itemType, fieldIdx, hasSecondaries);
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index dba6760..3df1b13 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -256,13 +256,18 @@
 
                 @Override
                 public void frameCompleted() throws HyracksDataException {
-                    callback.frameCompleted();
                     appender.write(writer, true);
+                    callback.frameCompleted();
                 }
 
                 @Override
                 public void close() throws IOException {
                     callback.close();
+                }
+
+                @Override
+                public void fail(Throwable th) {
+                    callback.fail(th);
                 }
             };
         } catch (Throwable e) { // NOSONAR: Re-thrown
@@ -305,7 +310,12 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int itemCount = accessor.getTupleCount();
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        try {
+            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback);
+        } catch (Throwable th) {// NOSONAR: Must notify of all failures
+            frameOpCallback.fail(th);
+            throw th;
+        }
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index be227ec..c0d18df 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
-import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
-
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -705,7 +703,7 @@
             }
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "LogFlusher is terminating abnormally. 
System is in unusable state; halting", e);
-            ExitUtil.halt(EC_IMMEDIATE_HALT);
+            ExitUtil.halt(ExitUtil.EC_TXN_LOG_FLUSHER_FAILURE);
             throw new AssertionError("not reachable");
         } finally {
             if (interrupted) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index ae66ee2..efa9c1c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -212,5 +212,6 @@
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, 
JobGenContext context)
             throws AlgebricksException;
 
-    public Map<String, String> getConfig();
+    public Map<String, Object> getConfig();
+
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 066b6c1..9ca4bdb 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -580,7 +580,7 @@
                 }
                 case UNORDERED_PARTITIONED: {
                     List<LogicalVariable> varList = new 
ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
-                    String hashMergeHint = 
context.getMetadataProvider().getConfig().get(HASH_MERGE);
+                    String hashMergeHint = (String) 
context.getMetadataProvider().getConfig().get(HASH_MERGE);
                     if (hashMergeHint == null || 
!hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
                         pop = new HashPartitionExchangePOperator(varList, 
domain);
                         break;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 627e972..77623c2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -73,7 +73,7 @@
         }
         if (registrationException != null) {
             LOGGER.fatal("Registering with {} failed with exception", this, 
registrationException);
-            ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
+            ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
         }
         return getCcId();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 040fe03..cde26d5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.ipc.impl;
 
-import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -344,7 +342,7 @@
                 }
             } catch (Exception e) {
                 LOGGER.fatal("Unrecoverable networking failure; Halting...", 
e);
-                ExitUtil.halt(EC_IMMEDIATE_HALT);
+                ExitUtil.halt(ExitUtil.EC_NETWORK_FAILURE);
             }
             return false;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index df78c53..78e5862 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -33,4 +33,11 @@
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
+
+    /**
+     * Called when the task has failed.
+     *
+     * @param th
+     */
+    void fail(Throwable th);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 14cfc59..652cb34 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -40,6 +40,9 @@
     public static final int EC_INCONSISTENT_METADATA = 8;
     public static final int EC_UNCAUGHT_THROWABLE = 9;
     public static final int EC_UNHANDLED_EXCEPTION = 11;
+    public static final int EC_TXN_LOG_FLUSHER_FAILURE = 12;
+    public static final int EC_NODE_REGISTRATION_FAILURE = 13;
+    public static final int EC_NETWORK_FAILURE = 14;
     public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2795
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to