abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2795
Change subject: [NO ISSUE] Allow MetadataProvider config to store non String values ...................................................................... [NO ISSUE] Allow MetadataProvider config to store non String values Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06 --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java 21 files changed, 91 insertions(+), 62 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/95/2795/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java index 503a631..d7e05b3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java @@ -60,7 +60,7 @@ } public static IAObject getSimThreshold(MetadataProvider metadata, String simFuncName) { - String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME); + String simThresholValue = metadata.getProperty(SIM_THRESHOLD_PROP_NAME); IAObject ret = null; if (simFuncName.equals(JACCARD_FUNCTION_NAME)) { if (simThresholValue != null) { @@ -103,7 +103,7 @@ public static float getSimThreshold(MetadataProvider metadata) { float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD; - String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME); + String simThresholValue = metadata.getProperty(SIM_THRESHOLD_PROP_NAME); if (simThresholValue != null) { simThreshold = Float.parseFloat(simThresholValue); } @@ -112,7 +112,7 @@ // TODO: The default function depend on the input types. public static String getSimFunction(MetadataProvider metadata) { - String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME); + String simFunction = metadata.getProperty(SIM_FUNCTION_PROP_NAME); if (simFunction == null) { simFunction = DEFAULT_SIM_FUNCTION; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java index 728aef6..c925b55 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java @@ -978,9 +978,9 @@ } // Gets all variables from the right (inner) branch. - VariableUtilities.getLiveVariables((ILogicalOperator) subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp); + VariableUtilities.getLiveVariables(subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp); // Gets the used variables from the SELECT or JOIN operator. - VariableUtilities.getUsedVariables((ILogicalOperator) topOpRef.getValue(), usedVarsInTopOp); + VariableUtilities.getUsedVariables(topOpRef.getValue(), usedVarsInTopOp); // Excludes the variables in the condition from the outer branch - in join case. for (Iterator<LogicalVariable> iterator = usedVarsInTopOp.iterator(); iterator.hasNext();) { LogicalVariable v = iterator.next(); @@ -1066,7 +1066,7 @@ if (afterTopOpRefs != null) { for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) { varsTmpSet.clear(); - OperatorPropertiesUtil.getFreeVariablesInOp((ILogicalOperator) afterTopOpRef.getValue(), varsTmpSet); + OperatorPropertiesUtil.getFreeVariablesInOp(afterTopOpRef.getValue(), varsTmpSet); copyVarsToAnotherList(varsTmpSet, usedVarsAfterTopOp); } } @@ -1210,7 +1210,7 @@ // For the index-nested-loop join case, // we propagate all variables that come from the outer relation and are used after join operator. // Adds the variables that are both live after JOIN and used after the JOIN operator. - VariableUtilities.getLiveVariables((ILogicalOperator) topOpRef.getValue(), liveVarsAfterTopOp); + VariableUtilities.getLiveVariables(topOpRef.getValue(), liveVarsAfterTopOp); for (LogicalVariable v : usedVarsAfterTopOp) { if (!liveVarsAfterTopOp.contains(v) || findVarInTripleVarList(unionVarMap, v, false)) { continue; @@ -1223,8 +1223,7 @@ // Replaces the original variables in the operators after the SELECT or JOIN operator to satisfy SSA. if (afterTopOpRefs != null) { for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) { - VariableUtilities.substituteVariables((ILogicalOperator) afterTopOpRef.getValue(), - origVarToOutputVarMap, context); + VariableUtilities.substituteVariables(afterTopOpRef.getValue(), origVarToOutputVarMap, context); } } @@ -1808,7 +1807,7 @@ List<LogicalVariable> dataScanRecordVars = new ArrayList<>(); // Collects the used variables in the given select (join) operator. - VariableUtilities.getUsedVariables((ILogicalOperator) topRef.getValue(), usedVarsInSelJoinOpTemp); + VariableUtilities.getUsedVariables(topRef.getValue(), usedVarsInSelJoinOpTemp); // Removes the duplicated variables that are used in the select (join) operator // in case where the variable is used multiple times in the operator's expression. @@ -1841,10 +1840,8 @@ List<LogicalVariable> liveVarsInSubTreeRootOp = new ArrayList<>(); List<LogicalVariable> producedVarsInSubTreeRootOp = new ArrayList<>(); - VariableUtilities.getLiveVariables((ILogicalOperator) indexSubTree.getRootRef().getValue(), - liveVarsInSubTreeRootOp); - VariableUtilities.getProducedVariables((ILogicalOperator) indexSubTree.getRootRef().getValue(), - producedVarsInSubTreeRootOp); + VariableUtilities.getLiveVariables(indexSubTree.getRootRef().getValue(), liveVarsInSubTreeRootOp); + VariableUtilities.getProducedVariables(indexSubTree.getRootRef().getValue(), producedVarsInSubTreeRootOp); copyVarsToAnotherList(liveVarsInSubTreeRootOp, liveVarsAfterSelJoinOp); copyVarsToAnotherList(producedVarsInSubTreeRootOp, liveVarsAfterSelJoinOp); @@ -2039,10 +2036,8 @@ for (Mutable<ILogicalOperator> afterSelectRef : afterSelectOpRefs) { usedVarsAfterSelectOrJoinOp.clear(); producedVarsAfterSelectOrJoinOp.clear(); - VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(), - usedVarsAfterSelectOrJoinOp); - VariableUtilities.getProducedVariables((ILogicalOperator) afterSelectRef.getValue(), - producedVarsAfterSelectOrJoinOp); + VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsAfterSelectOrJoinOp); + VariableUtilities.getProducedVariables(afterSelectRef.getValue(), producedVarsAfterSelectOrJoinOp); // Checks whether COUNT exists in the given plan since we can substitute record variable // with the PK variable as an optimization because COUNT(record) is equal to COUNT(PK). // For this case only, we can replace the record variable with the PK variable. @@ -2051,14 +2046,13 @@ aggOp = (AggregateOperator) afterSelectRefOp; condExprs = aggOp.getExpressions(); for (int i = 0; i < condExprs.size(); i++) { - condExpr = (ILogicalExpression) condExprs.get(i).getValue(); + condExpr = condExprs.get(i).getValue(); if (condExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { condExprFnCall = (AbstractFunctionCallExpression) condExpr; if (condExprFnCall.getFunctionIdentifier() == BuiltinFunctions.COUNT) { // COUNT found. count on the record ($$0) can be replaced as the PK variable. countAggFunctionIsUsedInThePlan = true; - VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(), - usedVarsInCount); + VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsInCount); break; } } @@ -2187,7 +2181,7 @@ } if (matchedFuncExprs.size() == 1) { - condExpr = (ILogicalExpression) optFuncExpr.getFuncExpr(); + condExpr = optFuncExpr.getFuncExpr(); condExprFnCall = (AbstractFunctionCallExpression) condExpr; for (int i = 0; i < condExprFnCall.getArguments().size(); i++) { Mutable<ILogicalExpression> expr = condExprFnCall.getArguments().get(i); @@ -2372,7 +2366,7 @@ AssignOperator assignOp = (AssignOperator) assignUnnestOp; condExprs = assignOp.getExpressions(); for (int i = 0; i < condExprs.size(); i++) { - condExpr = (ILogicalExpression) condExprs.get(i).getValue(); + condExpr = condExprs.get(i).getValue(); if (condExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT && !targetVars.contains(assignOp.getVariables().get(i))) { targetVars.add(assignOp.getVariables().get(i)); @@ -2400,13 +2394,13 @@ case LEFT_OUTER_UNNEST_MAP: return topOp; case UNIONALL: - dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue(); + dataSourceOp = dataSourceOp.getInputs().get(0).getValue(); // Index-only plan case: // The order of operators: 7 unionall <- 6 select <- 5 assign? // <- 4 unnest-map (PIdx) <- 3 split <- 2 unnest-map (SIdx) <- ... // We do this to skip the primary index-search since we are looking for a secondary index-search here. do { - dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue(); + dataSourceOp = dataSourceOp.getInputs().get(0).getValue(); } while (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT && dataSourceOp.hasInputs()); if (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT) { @@ -2415,7 +2409,7 @@ } do { - dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue(); + dataSourceOp = dataSourceOp.getInputs().get(0).getValue(); } while (dataSourceOp.getOperatorTag() != LogicalOperatorTag.UNNEST_MAP && dataSourceOp.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP && dataSourceOp.hasInputs()); @@ -2532,9 +2526,10 @@ * false otherwise. */ public static boolean getNoIndexOnlyOption(IOptimizationContext context) { - Map<String, String> config = context.getMetadataProvider().getConfig(); + Map<String, Object> config = context.getMetadataProvider().getConfig(); if (config.containsKey(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)) { - return Boolean.parseBoolean(config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)); + return Boolean + .parseBoolean((String) config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)); } return AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION_DEFAULT_VALUE; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 1a88170..5b961f0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -217,7 +217,7 @@ generateLogicalPlan(plan, output.config().getPlanFormat()); } CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties(); - Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc); + Map<String, Object> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc); final PhysicalOptimizationConfig physOptConf = getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig, sourceLoc); @@ -235,7 +235,7 @@ builder.setMissableTypeComputer(MissableTypeComputer.INSTANCE); builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE); - int parallelism = getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY), + int parallelism = getParallelism((String) querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY), compilerProperties.getParallelism()); AlgebricksAbsolutePartitionConstraint computationLocations = chooseLocations(clusterInfoCollector, parallelism, metadataProvider.getClusterLocations()); @@ -308,19 +308,19 @@ } protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties, - Map<String, String> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException { + Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException { int frameSize = compilerProperties.getFrameSize(); int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY), compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT, sourceLoc); int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY), compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY, sourceLoc); int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc); int textSearchFrameLimit = getFrameLimit(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY), compilerProperties.getTextSearchMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_TEXTSEARCH, sourceLoc); final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig(); physOptConf.setFrameSize(frameSize); @@ -482,7 +482,7 @@ } // Validates if the query contains unsupported query parameters. - private static Map<String, String> validateConfig(Map<String, String> config, SourceLocation sourceLoc) + private static Map<String, Object> validateConfig(Map<String, Object> config, SourceLocation sourceLoc) throws AlgebricksException { for (String parameterName : config.keySet()) { if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 99ed42d..d4a52fb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -82,7 +82,8 @@ // Note: The current implementation of the wait for completion flag is problematic due to locking issues: // Locks obtained during the start of the feed are not released, and so, the feed can't be stopped // and also, read locks over dataverses, datasets, etc, are never released. - boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)); + boolean wait = + Boolean.parseBoolean((String) metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)); if (wait) { IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED)); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java index e9087c2..bb80406 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java @@ -83,7 +83,7 @@ String outputType = ConstantExpressionUtil.getStringArgument(f, 5); MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); DataSourceId asid = new DataSourceId(dataverse, getTargetFeed); - String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); + String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName); if (policy == null) { policy = BuiltinFeedPolicies.getFeedPolicy(policyName); @@ -93,7 +93,7 @@ } } ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>(); - String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS); + String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS); List<LogicalVariable> pkVars = new ArrayList<>(); FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation, metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index c1ccda7..b07a30e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -45,6 +45,7 @@ import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Dataverse; @@ -684,6 +685,9 @@ int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider, IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + MetadataProvider mdProvider = new MetadataProvider( + (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), + MetadataBuiltinEntities.DEFAULT_DATAVERSE); org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -700,8 +704,9 @@ new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), - modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1, - frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory, + modificationCallbackFactory, searchCallbackFactory, + keyIndexes.length, recordType, -1, frameOpCallbackFactory == null + ? dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory, MissingWriterFactory.INSTANCE, hasSecondaries); RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset, filterFields == null ? 0 : filterFields.length, recordType, metaType); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java index 830307b..e4e300a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java @@ -127,7 +127,7 @@ MetadataProvider metadataProvider = mock(MetadataProvider.class); @SuppressWarnings("unchecked") - Map<String, String> config = mock(Map.class); + Map<String, Object> config = mock(Map.class); when(metadataProvider.getDefaultDataverseName()).thenReturn(dvName); when(metadataProvider.getConfig()).thenReturn(config); when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true"); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java index 8f28752..1d7ad2d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java @@ -50,5 +50,10 @@ public void close() throws IOException { // No Op } + + @Override + public void fail(Throwable th) { + // No Op + } } } diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java index 1f564a5..6752f77 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java @@ -102,7 +102,7 @@ if (expression == null) { return functionDecls; } - String value = metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS); + String value = (String) metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS); boolean includePrivateFunctions = (value != null) ? Boolean.valueOf(value.toLowerCase()) : false; Set<CallExpr> functionCalls = functionCollector.getFunctionCalls(expression); for (CallExpr functionCall : functionCalls) { diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java index 09a9f90..ecd8c8e 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java @@ -168,7 +168,7 @@ } protected void inlineWithExpressions() throws CompilationException { - String inlineWith = metadataProvider.getConfig().get(INLINE_WITH); + String inlineWith = (String) metadataProvider.getConfig().get(INLINE_WITH); if (inlineWith != null && inlineWith.equalsIgnoreCase(NOT_INLINE_WITH)) { return; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 7f8d31d..38fd2f28 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -148,7 +148,7 @@ private final StorageProperties storageProperties; private final IFunctionManager functionManager; private final LockList locks; - private final Map<String, String> config; + private final Map<String, Object> config; private Dataverse defaultDataverse; private MetadataTransactionContext mdTxnCtx; @@ -173,8 +173,13 @@ config = new HashMap<>(); } - public String getPropertyValue(String propertyName) { - return config.get(propertyName); + @SuppressWarnings("unchecked") + public <T> T getProperty(String name) { + return (T) config.get(name); + } + + public void setProperty(String name, Object value) { + config.put(name, value); } public void disableBlockingOperator() { @@ -186,7 +191,7 @@ } @Override - public Map<String, String> getConfig() { + public Map<String, Object> getConfig() { return config; } @@ -296,7 +301,7 @@ */ public ARecordType findOutputRecordType() throws AlgebricksException { return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(), - getPropertyValue("output-record-type")); + getProperty("output-record-type")); } public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException { @@ -1545,6 +1550,7 @@ } } + @Override public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index d0a22b7..8471d45 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -682,7 +682,7 @@ datasetPartitions, isSink); } - public IFrameOperationCallbackFactory getFrameOpCallbackFactory() { + public IFrameOperationCallbackFactory getFrameOpCallbackFactory(MetadataProvider mdProvider) { return NoOpFrameOperationCallbackFactory.INSTANCE; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 6d81145..bbdfadf 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -447,7 +447,7 @@ RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh, missingWriterFactory, modificationCallbackFactory, searchCallbackFactory, - dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries); + dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries); return new Pair<>(op, splitsAndConstraint.second); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index dba6760..3df1b13 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -256,13 +256,18 @@ @Override public void frameCompleted() throws HyracksDataException { - callback.frameCompleted(); appender.write(writer, true); + callback.frameCompleted(); } @Override public void close() throws IOException { callback.close(); + } + + @Override + public void fail(Throwable th) { + callback.fail(th); } }; } catch (Throwable e) { // NOSONAR: Re-thrown @@ -305,7 +310,12 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int itemCount = accessor.getTupleCount(); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + try { + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + } catch (Throwable th) {// NOSONAR: Must notify of all failures + frameOpCallback.fail(th); + throw th; + } if (itemCount > 0) { lastRecordInTimeStamp = System.currentTimeMillis(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index be227ec..c0d18df 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.transaction.management.service.logging; -import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT; - import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -705,7 +703,7 @@ } } catch (Exception e) { LOGGER.log(Level.ERROR, "LogFlusher is terminating abnormally. System is in unusable state; halting", e); - ExitUtil.halt(EC_IMMEDIATE_HALT); + ExitUtil.halt(ExitUtil.EC_TXN_LOG_FLUSHER_FAILURE); throw new AssertionError("not reachable"); } finally { if (interrupted) { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java index ae66ee2..efa9c1c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java @@ -212,5 +212,6 @@ IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException; - public Map<String, String> getConfig(); + public Map<String, Object> getConfig(); + } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 066b6c1..9ca4bdb 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -580,7 +580,7 @@ } case UNORDERED_PARTITIONED: { List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet()); - String hashMergeHint = context.getMetadataProvider().getConfig().get(HASH_MERGE); + String hashMergeHint = (String) context.getMetadataProvider().getConfig().get(HASH_MERGE); if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) { pop = new HashPartitionExchangePOperator(varList, domain); break; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java index 627e972..77623c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java @@ -73,7 +73,7 @@ } if (registrationException != null) { LOGGER.fatal("Registering with {} failed with exception", this, registrationException); - ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT); + ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE); } return getCcId(); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 040fe03..cde26d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.ipc.impl; -import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -344,7 +342,7 @@ } } catch (Exception e) { LOGGER.fatal("Unrecoverable networking failure; Halting...", e); - ExitUtil.halt(EC_IMMEDIATE_HALT); + ExitUtil.halt(ExitUtil.EC_NETWORK_FAILURE); } return false; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java index df78c53..78e5862 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java @@ -33,4 +33,11 @@ * @throws HyracksDataException */ void frameCompleted() throws HyracksDataException; + + /** + * Called when the task has failed. + * + * @param th + */ + void fail(Throwable th); } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index 14cfc59..652cb34 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -40,6 +40,9 @@ public static final int EC_INCONSISTENT_METADATA = 8; public static final int EC_UNCAUGHT_THROWABLE = 9; public static final int EC_UNHANDLED_EXCEPTION = 11; + public static final int EC_TXN_LOG_FLUSHER_FAILURE = 12; + public static final int EC_NODE_REGISTRATION_FAILURE = 13; + public static final int EC_NETWORK_FAILURE = 14; public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22; public static final int EC_IMMEDIATE_HALT = 33; public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44; -- To view, visit https://asterix-gerrit.ics.uci.edu/2795 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>