abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE] Allow MetadataProvider config to store non String values ......................................................................
[NO ISSUE] Allow MetadataProvider config to store non String values - user model changes: no - storage format changes: no - interface changes: yes Details: - In many cases, we would like to associate a value with a key in MetadataProvider to be accessed during the compilation of jobs. However, currently, there is no place to store such values, so we ended up storing them in the config map. - The config map is a <String, String> map and so, we would write our values as a string and then parse them when needed. - To avoid this, and to avoid introducing a new map, we simply change the config stored in MetadataProvider from <String,String> to <String, Object>. Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2795 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java 24 files changed, 104 insertions(+), 64 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; Murtadha Hubail: Looks good to me, approved; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java index 503a631..d7e05b3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java @@ -60,7 +60,7 @@ } public static IAObject getSimThreshold(MetadataProvider metadata, String simFuncName) { - String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME); + String simThresholValue = metadata.getProperty(SIM_THRESHOLD_PROP_NAME); IAObject ret = null; if (simFuncName.equals(JACCARD_FUNCTION_NAME)) { if (simThresholValue != null) { @@ -103,7 +103,7 @@ public static float getSimThreshold(MetadataProvider metadata) { float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD; - String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME); + String simThresholValue = metadata.getProperty(SIM_THRESHOLD_PROP_NAME); if (simThresholValue != null) { simThreshold = Float.parseFloat(simThresholValue); } @@ -112,7 +112,7 @@ // TODO: The default function depend on the input types. public static String getSimFunction(MetadataProvider metadata) { - String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME); + String simFunction = metadata.getProperty(SIM_FUNCTION_PROP_NAME); if (simFunction == null) { simFunction = DEFAULT_SIM_FUNCTION; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java index 728aef6..c925b55 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java @@ -978,9 +978,9 @@ } // Gets all variables from the right (inner) branch. - VariableUtilities.getLiveVariables((ILogicalOperator) subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp); + VariableUtilities.getLiveVariables(subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp); // Gets the used variables from the SELECT or JOIN operator. - VariableUtilities.getUsedVariables((ILogicalOperator) topOpRef.getValue(), usedVarsInTopOp); + VariableUtilities.getUsedVariables(topOpRef.getValue(), usedVarsInTopOp); // Excludes the variables in the condition from the outer branch - in join case. for (Iterator<LogicalVariable> iterator = usedVarsInTopOp.iterator(); iterator.hasNext();) { LogicalVariable v = iterator.next(); @@ -1066,7 +1066,7 @@ if (afterTopOpRefs != null) { for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) { varsTmpSet.clear(); - OperatorPropertiesUtil.getFreeVariablesInOp((ILogicalOperator) afterTopOpRef.getValue(), varsTmpSet); + OperatorPropertiesUtil.getFreeVariablesInOp(afterTopOpRef.getValue(), varsTmpSet); copyVarsToAnotherList(varsTmpSet, usedVarsAfterTopOp); } } @@ -1210,7 +1210,7 @@ // For the index-nested-loop join case, // we propagate all variables that come from the outer relation and are used after join operator. // Adds the variables that are both live after JOIN and used after the JOIN operator. - VariableUtilities.getLiveVariables((ILogicalOperator) topOpRef.getValue(), liveVarsAfterTopOp); + VariableUtilities.getLiveVariables(topOpRef.getValue(), liveVarsAfterTopOp); for (LogicalVariable v : usedVarsAfterTopOp) { if (!liveVarsAfterTopOp.contains(v) || findVarInTripleVarList(unionVarMap, v, false)) { continue; @@ -1223,8 +1223,7 @@ // Replaces the original variables in the operators after the SELECT or JOIN operator to satisfy SSA. if (afterTopOpRefs != null) { for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) { - VariableUtilities.substituteVariables((ILogicalOperator) afterTopOpRef.getValue(), - origVarToOutputVarMap, context); + VariableUtilities.substituteVariables(afterTopOpRef.getValue(), origVarToOutputVarMap, context); } } @@ -1808,7 +1807,7 @@ List<LogicalVariable> dataScanRecordVars = new ArrayList<>(); // Collects the used variables in the given select (join) operator. - VariableUtilities.getUsedVariables((ILogicalOperator) topRef.getValue(), usedVarsInSelJoinOpTemp); + VariableUtilities.getUsedVariables(topRef.getValue(), usedVarsInSelJoinOpTemp); // Removes the duplicated variables that are used in the select (join) operator // in case where the variable is used multiple times in the operator's expression. @@ -1841,10 +1840,8 @@ List<LogicalVariable> liveVarsInSubTreeRootOp = new ArrayList<>(); List<LogicalVariable> producedVarsInSubTreeRootOp = new ArrayList<>(); - VariableUtilities.getLiveVariables((ILogicalOperator) indexSubTree.getRootRef().getValue(), - liveVarsInSubTreeRootOp); - VariableUtilities.getProducedVariables((ILogicalOperator) indexSubTree.getRootRef().getValue(), - producedVarsInSubTreeRootOp); + VariableUtilities.getLiveVariables(indexSubTree.getRootRef().getValue(), liveVarsInSubTreeRootOp); + VariableUtilities.getProducedVariables(indexSubTree.getRootRef().getValue(), producedVarsInSubTreeRootOp); copyVarsToAnotherList(liveVarsInSubTreeRootOp, liveVarsAfterSelJoinOp); copyVarsToAnotherList(producedVarsInSubTreeRootOp, liveVarsAfterSelJoinOp); @@ -2039,10 +2036,8 @@ for (Mutable<ILogicalOperator> afterSelectRef : afterSelectOpRefs) { usedVarsAfterSelectOrJoinOp.clear(); producedVarsAfterSelectOrJoinOp.clear(); - VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(), - usedVarsAfterSelectOrJoinOp); - VariableUtilities.getProducedVariables((ILogicalOperator) afterSelectRef.getValue(), - producedVarsAfterSelectOrJoinOp); + VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsAfterSelectOrJoinOp); + VariableUtilities.getProducedVariables(afterSelectRef.getValue(), producedVarsAfterSelectOrJoinOp); // Checks whether COUNT exists in the given plan since we can substitute record variable // with the PK variable as an optimization because COUNT(record) is equal to COUNT(PK). // For this case only, we can replace the record variable with the PK variable. @@ -2051,14 +2046,13 @@ aggOp = (AggregateOperator) afterSelectRefOp; condExprs = aggOp.getExpressions(); for (int i = 0; i < condExprs.size(); i++) { - condExpr = (ILogicalExpression) condExprs.get(i).getValue(); + condExpr = condExprs.get(i).getValue(); if (condExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { condExprFnCall = (AbstractFunctionCallExpression) condExpr; if (condExprFnCall.getFunctionIdentifier() == BuiltinFunctions.COUNT) { // COUNT found. count on the record ($$0) can be replaced as the PK variable. countAggFunctionIsUsedInThePlan = true; - VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(), - usedVarsInCount); + VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsInCount); break; } } @@ -2187,7 +2181,7 @@ } if (matchedFuncExprs.size() == 1) { - condExpr = (ILogicalExpression) optFuncExpr.getFuncExpr(); + condExpr = optFuncExpr.getFuncExpr(); condExprFnCall = (AbstractFunctionCallExpression) condExpr; for (int i = 0; i < condExprFnCall.getArguments().size(); i++) { Mutable<ILogicalExpression> expr = condExprFnCall.getArguments().get(i); @@ -2372,7 +2366,7 @@ AssignOperator assignOp = (AssignOperator) assignUnnestOp; condExprs = assignOp.getExpressions(); for (int i = 0; i < condExprs.size(); i++) { - condExpr = (ILogicalExpression) condExprs.get(i).getValue(); + condExpr = condExprs.get(i).getValue(); if (condExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT && !targetVars.contains(assignOp.getVariables().get(i))) { targetVars.add(assignOp.getVariables().get(i)); @@ -2400,13 +2394,13 @@ case LEFT_OUTER_UNNEST_MAP: return topOp; case UNIONALL: - dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue(); + dataSourceOp = dataSourceOp.getInputs().get(0).getValue(); // Index-only plan case: // The order of operators: 7 unionall <- 6 select <- 5 assign? // <- 4 unnest-map (PIdx) <- 3 split <- 2 unnest-map (SIdx) <- ... // We do this to skip the primary index-search since we are looking for a secondary index-search here. do { - dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue(); + dataSourceOp = dataSourceOp.getInputs().get(0).getValue(); } while (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT && dataSourceOp.hasInputs()); if (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT) { @@ -2415,7 +2409,7 @@ } do { - dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue(); + dataSourceOp = dataSourceOp.getInputs().get(0).getValue(); } while (dataSourceOp.getOperatorTag() != LogicalOperatorTag.UNNEST_MAP && dataSourceOp.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP && dataSourceOp.hasInputs()); @@ -2532,9 +2526,10 @@ * false otherwise. */ public static boolean getNoIndexOnlyOption(IOptimizationContext context) { - Map<String, String> config = context.getMetadataProvider().getConfig(); + Map<String, Object> config = context.getMetadataProvider().getConfig(); if (config.containsKey(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)) { - return Boolean.parseBoolean(config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)); + return Boolean + .parseBoolean((String) config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)); } return AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION_DEFAULT_VALUE; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 1a88170..ca96a96 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -129,6 +129,7 @@ private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter(); // A white list of supported configurable parameters. + public static final String PREFIX_INTERNAL_PARAMETERS = "_internal"; private static final Set<String> CONFIGURABLE_PARAMETER_NAMES = ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY, CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, @@ -217,7 +218,7 @@ generateLogicalPlan(plan, output.config().getPlanFormat()); } CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties(); - Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc); + Map<String, Object> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc); final PhysicalOptimizationConfig physOptConf = getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig, sourceLoc); @@ -235,7 +236,7 @@ builder.setMissableTypeComputer(MissableTypeComputer.INSTANCE); builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE); - int parallelism = getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY), + int parallelism = getParallelism((String) querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY), compilerProperties.getParallelism()); AlgebricksAbsolutePartitionConstraint computationLocations = chooseLocations(clusterInfoCollector, parallelism, metadataProvider.getClusterLocations()); @@ -308,19 +309,19 @@ } protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties, - Map<String, String> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException { + Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException { int frameSize = compilerProperties.getFrameSize(); int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY), compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT, sourceLoc); int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY), compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY, sourceLoc); int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc); int textSearchFrameLimit = getFrameLimit(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, - querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY), + (String) querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY), compilerProperties.getTextSearchMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_TEXTSEARCH, sourceLoc); final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig(); physOptConf.setFrameSize(frameSize); @@ -482,10 +483,11 @@ } // Validates if the query contains unsupported query parameters. - private static Map<String, String> validateConfig(Map<String, String> config, SourceLocation sourceLoc) + private static Map<String, Object> validateConfig(Map<String, Object> config, SourceLocation sourceLoc) throws AlgebricksException { for (String parameterName : config.keySet()) { - if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) { + if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName) + && !parameterName.startsWith(PREFIX_INTERNAL_PARAMETERS)) { throw AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER, sourceLoc, parameterName); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 99ed42d..d4a52fb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -82,7 +82,8 @@ // Note: The current implementation of the wait for completion flag is problematic due to locking issues: // Locks obtained during the start of the feed are not released, and so, the feed can't be stopped // and also, read locks over dataverses, datasets, etc, are never released. - boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)); + boolean wait = + Boolean.parseBoolean((String) metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)); if (wait) { IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED)); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java index e9087c2..bb80406 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java @@ -83,7 +83,7 @@ String outputType = ConstantExpressionUtil.getStringArgument(f, 5); MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); DataSourceId asid = new DataSourceId(dataverse, getTargetFeed); - String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); + String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName); if (policy == null) { policy = BuiltinFeedPolicies.getFeedPolicy(policyName); @@ -93,7 +93,7 @@ } } ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>(); - String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS); + String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS); List<LogicalVariable> pkVars = new ArrayList<>(); FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation, metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 76116c4..5b4c198 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -421,10 +421,13 @@ } } - protected void handleSetStatement(Statement stmt, Map<String, String> config) { + protected void handleSetStatement(Statement stmt, Map<String, String> config) throws CompilationException { SetStatement ss = (SetStatement) stmt; String pname = ss.getPropName(); String pvalue = ss.getPropValue(); + if (pname.startsWith(APIFramework.PREFIX_INTERNAL_PARAMETERS)) { + throw new CompilationException(ErrorCode.ILLEGAL_SET_PARAMETER, pname); + } config.put(pname, pvalue); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index c1ccda7..b07a30e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -45,6 +45,7 @@ import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Dataverse; @@ -684,6 +685,9 @@ int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider, IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + MetadataProvider mdProvider = new MetadataProvider( + (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), + MetadataBuiltinEntities.DEFAULT_DATAVERSE); org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -700,8 +704,9 @@ new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), - modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1, - frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory, + modificationCallbackFactory, searchCallbackFactory, + keyIndexes.length, recordType, -1, frameOpCallbackFactory == null + ? dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory, MissingWriterFactory.INSTANCE, hasSecondaries); RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset, filterFields == null ? 0 : filterFields.length, recordType, metaType); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java index 830307b..e4e300a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java @@ -127,7 +127,7 @@ MetadataProvider metadataProvider = mock(MetadataProvider.class); @SuppressWarnings("unchecked") - Map<String, String> config = mock(Map.class); + Map<String, Object> config = mock(Map.class); when(metadataProvider.getDefaultDataverseName()).thenReturn(dvName); when(metadataProvider.getConfig()).thenReturn(config); when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true"); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java index 8f28752..1d7ad2d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java @@ -50,5 +50,10 @@ public void close() throws IOException { // No Op } + + @Override + public void fail(Throwable th) { + // No Op + } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index f570aa8..d8f3f7d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -177,6 +177,7 @@ public static final int FIELD_NOT_OF_TYPE = 1089; public static final int ARRAY_FIELD_ELEMENTS_MUST_BE_OF_TYPE = 1090; public static final int COMPILATION_TYPE_MISMATCH_GENERIC = 1091; + public static final int ILLEGAL_SET_PARAMETER = 1092; // Feed errors public static final int DATAFLOW_ILLEGAL_STATE = 3001; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 8c09d75..5c67f4e 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -164,6 +164,7 @@ 1088 = Required field %1$s was not found 1089 = Field %1$s must be of type %2$s but found to be of type %3$s 1090 = Field %1$s must be of an array of type %2$s but found to contain an item of type %3$s +1092 = Parameter %1$s cannot be set # Feed Errors 3001 = Illegal state. diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java index 1f564a5..6752f77 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java @@ -102,7 +102,7 @@ if (expression == null) { return functionDecls; } - String value = metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS); + String value = (String) metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS); boolean includePrivateFunctions = (value != null) ? Boolean.valueOf(value.toLowerCase()) : false; Set<CallExpr> functionCalls = functionCollector.getFunctionCalls(expression); for (CallExpr functionCall : functionCalls) { diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java index 09a9f90..ecd8c8e 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java @@ -168,7 +168,7 @@ } protected void inlineWithExpressions() throws CompilationException { - String inlineWith = metadataProvider.getConfig().get(INLINE_WITH); + String inlineWith = (String) metadataProvider.getConfig().get(INLINE_WITH); if (inlineWith != null && inlineWith.equalsIgnoreCase(NOT_INLINE_WITH)) { return; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index f2be6cd..f3f5c56 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -148,7 +148,7 @@ private final StorageProperties storageProperties; private final IFunctionManager functionManager; private final LockList locks; - private final Map<String, String> config; + private final Map<String, Object> config; private Dataverse defaultDataverse; private MetadataTransactionContext mdTxnCtx; @@ -173,8 +173,18 @@ config = new HashMap<>(); } - public String getPropertyValue(String propertyName) { - return config.get(propertyName); + @SuppressWarnings("unchecked") + public <T> T getProperty(String name) { + return (T) config.get(name); + } + + public void setProperty(String name, Object value) { + config.put(name, value); + } + + @SuppressWarnings("unchecked") + public <T> T removeProperty(String name) { + return (T) config.remove(name); } public void disableBlockingOperator() { @@ -186,7 +196,7 @@ } @Override - public Map<String, String> getConfig() { + public Map<String, Object> getConfig() { return config; } @@ -296,7 +306,7 @@ */ public ARecordType findOutputRecordType() throws AlgebricksException { return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(), - getPropertyValue("output-record-type")); + getProperty("output-record-type")); } public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index d0a22b7..8471d45 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -682,7 +682,7 @@ datasetPartitions, isSink); } - public IFrameOperationCallbackFactory getFrameOpCallbackFactory() { + public IFrameOperationCallbackFactory getFrameOpCallbackFactory(MetadataProvider mdProvider) { return NoOpFrameOperationCallbackFactory.INSTANCE; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 6d81145..bbdfadf 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -447,7 +447,7 @@ RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh, missingWriterFactory, modificationCallbackFactory, searchCallbackFactory, - dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries); + dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries); return new Pair<>(op, splitsAndConstraint.second); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index dba6760..3df1b13 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -256,13 +256,18 @@ @Override public void frameCompleted() throws HyracksDataException { - callback.frameCompleted(); appender.write(writer, true); + callback.frameCompleted(); } @Override public void close() throws IOException { callback.close(); + } + + @Override + public void fail(Throwable th) { + callback.fail(th); } }; } catch (Throwable e) { // NOSONAR: Re-thrown @@ -305,7 +310,12 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int itemCount = accessor.getTupleCount(); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + try { + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + } catch (Throwable th) {// NOSONAR: Must notify of all failures + frameOpCallback.fail(th); + throw th; + } if (itemCount > 0) { lastRecordInTimeStamp = System.currentTimeMillis(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index be227ec..c0d18df 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.transaction.management.service.logging; -import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT; - import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -705,7 +703,7 @@ } } catch (Exception e) { LOGGER.log(Level.ERROR, "LogFlusher is terminating abnormally. System is in unusable state; halting", e); - ExitUtil.halt(EC_IMMEDIATE_HALT); + ExitUtil.halt(ExitUtil.EC_TXN_LOG_FLUSHER_FAILURE); throw new AssertionError("not reachable"); } finally { if (interrupted) { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java index ae66ee2..efa9c1c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java @@ -212,5 +212,6 @@ IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException; - public Map<String, String> getConfig(); + public Map<String, Object> getConfig(); + } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 066b6c1..9ca4bdb 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -580,7 +580,7 @@ } case UNORDERED_PARTITIONED: { List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet()); - String hashMergeHint = context.getMetadataProvider().getConfig().get(HASH_MERGE); + String hashMergeHint = (String) context.getMetadataProvider().getConfig().get(HASH_MERGE); if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) { pop = new HashPartitionExchangePOperator(varList, domain); break; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java index 627e972..77623c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java @@ -73,7 +73,7 @@ } if (registrationException != null) { LOGGER.fatal("Registering with {} failed with exception", this, registrationException); - ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT); + ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE); } return getCcId(); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 040fe03..cde26d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.ipc.impl; -import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -344,7 +342,7 @@ } } catch (Exception e) { LOGGER.fatal("Unrecoverable networking failure; Halting...", e); - ExitUtil.halt(EC_IMMEDIATE_HALT); + ExitUtil.halt(ExitUtil.EC_NETWORK_FAILURE); } return false; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java index df78c53..78e5862 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java @@ -33,4 +33,11 @@ * @throws HyracksDataException */ void frameCompleted() throws HyracksDataException; + + /** + * Called when the task has failed. + * + * @param th + */ + void fail(Throwable th); } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index f7c401a..75865b9d 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -42,6 +42,9 @@ public static final int EC_UNHANDLED_EXCEPTION = 11; public static final int EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES = 12; public static final int EC_ERROR_CREATING_RESOURCES = 13; + public static final int EC_TXN_LOG_FLUSHER_FAILURE = 14; + public static final int EC_NODE_REGISTRATION_FAILURE = 15; + public static final int EC_NETWORK_FAILURE = 16; public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22; public static final int EC_IMMEDIATE_HALT = 33; public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44; -- To view, visit https://asterix-gerrit.ics.uci.edu/2795 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06 Gerrit-PatchSet: 10 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
