abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2795
Change subject: [NO ISSUE] Allow MetadataProvider config to store non String
values
......................................................................
[NO ISSUE] Allow MetadataProvider config to store non String values
Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
M
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
M
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
21 files changed, 91 insertions(+), 62 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/95/2795/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
index 503a631..d7e05b3 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
@@ -60,7 +60,7 @@
}
public static IAObject getSimThreshold(MetadataProvider metadata, String
simFuncName) {
- String simThresholValue =
metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+ String simThresholValue =
metadata.getProperty(SIM_THRESHOLD_PROP_NAME);
IAObject ret = null;
if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
if (simThresholValue != null) {
@@ -103,7 +103,7 @@
public static float getSimThreshold(MetadataProvider metadata) {
float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD;
- String simThresholValue =
metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+ String simThresholValue =
metadata.getProperty(SIM_THRESHOLD_PROP_NAME);
if (simThresholValue != null) {
simThreshold = Float.parseFloat(simThresholValue);
}
@@ -112,7 +112,7 @@
// TODO: The default function depend on the input types.
public static String getSimFunction(MetadataProvider metadata) {
- String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME);
+ String simFunction = metadata.getProperty(SIM_FUNCTION_PROP_NAME);
if (simFunction == null) {
simFunction = DEFAULT_SIM_FUNCTION;
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index 728aef6..c925b55 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -978,9 +978,9 @@
}
// Gets all variables from the right (inner) branch.
- VariableUtilities.getLiveVariables((ILogicalOperator)
subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp);
+ VariableUtilities.getLiveVariables(subTree.getRootRef().getValue(),
liveVarsInSubTreeRootOp);
// Gets the used variables from the SELECT or JOIN operator.
- VariableUtilities.getUsedVariables((ILogicalOperator)
topOpRef.getValue(), usedVarsInTopOp);
+ VariableUtilities.getUsedVariables(topOpRef.getValue(),
usedVarsInTopOp);
// Excludes the variables in the condition from the outer branch - in
join case.
for (Iterator<LogicalVariable> iterator = usedVarsInTopOp.iterator();
iterator.hasNext();) {
LogicalVariable v = iterator.next();
@@ -1066,7 +1066,7 @@
if (afterTopOpRefs != null) {
for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) {
varsTmpSet.clear();
- OperatorPropertiesUtil.getFreeVariablesInOp((ILogicalOperator)
afterTopOpRef.getValue(), varsTmpSet);
+
OperatorPropertiesUtil.getFreeVariablesInOp(afterTopOpRef.getValue(),
varsTmpSet);
copyVarsToAnotherList(varsTmpSet, usedVarsAfterTopOp);
}
}
@@ -1210,7 +1210,7 @@
// For the index-nested-loop join case,
// we propagate all variables that come from the outer relation and
are used after join operator.
// Adds the variables that are both live after JOIN and used after the
JOIN operator.
- VariableUtilities.getLiveVariables((ILogicalOperator)
topOpRef.getValue(), liveVarsAfterTopOp);
+ VariableUtilities.getLiveVariables(topOpRef.getValue(),
liveVarsAfterTopOp);
for (LogicalVariable v : usedVarsAfterTopOp) {
if (!liveVarsAfterTopOp.contains(v) ||
findVarInTripleVarList(unionVarMap, v, false)) {
continue;
@@ -1223,8 +1223,7 @@
// Replaces the original variables in the operators after the SELECT
or JOIN operator to satisfy SSA.
if (afterTopOpRefs != null) {
for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) {
- VariableUtilities.substituteVariables((ILogicalOperator)
afterTopOpRef.getValue(),
- origVarToOutputVarMap, context);
+
VariableUtilities.substituteVariables(afterTopOpRef.getValue(),
origVarToOutputVarMap, context);
}
}
@@ -1808,7 +1807,7 @@
List<LogicalVariable> dataScanRecordVars = new ArrayList<>();
// Collects the used variables in the given select (join) operator.
- VariableUtilities.getUsedVariables((ILogicalOperator)
topRef.getValue(), usedVarsInSelJoinOpTemp);
+ VariableUtilities.getUsedVariables(topRef.getValue(),
usedVarsInSelJoinOpTemp);
// Removes the duplicated variables that are used in the select (join)
operator
// in case where the variable is used multiple times in the operator's
expression.
@@ -1841,10 +1840,8 @@
List<LogicalVariable> liveVarsInSubTreeRootOp = new ArrayList<>();
List<LogicalVariable> producedVarsInSubTreeRootOp = new ArrayList<>();
- VariableUtilities.getLiveVariables((ILogicalOperator)
indexSubTree.getRootRef().getValue(),
- liveVarsInSubTreeRootOp);
- VariableUtilities.getProducedVariables((ILogicalOperator)
indexSubTree.getRootRef().getValue(),
- producedVarsInSubTreeRootOp);
+
VariableUtilities.getLiveVariables(indexSubTree.getRootRef().getValue(),
liveVarsInSubTreeRootOp);
+
VariableUtilities.getProducedVariables(indexSubTree.getRootRef().getValue(),
producedVarsInSubTreeRootOp);
copyVarsToAnotherList(liveVarsInSubTreeRootOp, liveVarsAfterSelJoinOp);
copyVarsToAnotherList(producedVarsInSubTreeRootOp,
liveVarsAfterSelJoinOp);
@@ -2039,10 +2036,8 @@
for (Mutable<ILogicalOperator> afterSelectRef : afterSelectOpRefs) {
usedVarsAfterSelectOrJoinOp.clear();
producedVarsAfterSelectOrJoinOp.clear();
- VariableUtilities.getUsedVariables((ILogicalOperator)
afterSelectRef.getValue(),
- usedVarsAfterSelectOrJoinOp);
- VariableUtilities.getProducedVariables((ILogicalOperator)
afterSelectRef.getValue(),
- producedVarsAfterSelectOrJoinOp);
+ VariableUtilities.getUsedVariables(afterSelectRef.getValue(),
usedVarsAfterSelectOrJoinOp);
+ VariableUtilities.getProducedVariables(afterSelectRef.getValue(),
producedVarsAfterSelectOrJoinOp);
// Checks whether COUNT exists in the given plan since we can
substitute record variable
// with the PK variable as an optimization because COUNT(record)
is equal to COUNT(PK).
// For this case only, we can replace the record variable with the
PK variable.
@@ -2051,14 +2046,13 @@
aggOp = (AggregateOperator) afterSelectRefOp;
condExprs = aggOp.getExpressions();
for (int i = 0; i < condExprs.size(); i++) {
- condExpr = (ILogicalExpression)
condExprs.get(i).getValue();
+ condExpr = condExprs.get(i).getValue();
if (condExpr.getExpressionTag() ==
LogicalExpressionTag.FUNCTION_CALL) {
condExprFnCall = (AbstractFunctionCallExpression)
condExpr;
if (condExprFnCall.getFunctionIdentifier() ==
BuiltinFunctions.COUNT) {
// COUNT found. count on the record ($$0) can be
replaced as the PK variable.
countAggFunctionIsUsedInThePlan = true;
-
VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(),
- usedVarsInCount);
+
VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsInCount);
break;
}
}
@@ -2187,7 +2181,7 @@
}
if (matchedFuncExprs.size() == 1) {
- condExpr = (ILogicalExpression) optFuncExpr.getFuncExpr();
+ condExpr = optFuncExpr.getFuncExpr();
condExprFnCall = (AbstractFunctionCallExpression) condExpr;
for (int i = 0; i < condExprFnCall.getArguments().size(); i++) {
Mutable<ILogicalExpression> expr =
condExprFnCall.getArguments().get(i);
@@ -2372,7 +2366,7 @@
AssignOperator assignOp = (AssignOperator) assignUnnestOp;
condExprs = assignOp.getExpressions();
for (int i = 0; i < condExprs.size(); i++) {
- condExpr = (ILogicalExpression)
condExprs.get(i).getValue();
+ condExpr = condExprs.get(i).getValue();
if (condExpr.getExpressionTag() ==
LogicalExpressionTag.CONSTANT
&&
!targetVars.contains(assignOp.getVariables().get(i))) {
targetVars.add(assignOp.getVariables().get(i));
@@ -2400,13 +2394,13 @@
case LEFT_OUTER_UNNEST_MAP:
return topOp;
case UNIONALL:
- dataSourceOp = (ILogicalOperator)
dataSourceOp.getInputs().get(0).getValue();
+ dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
// Index-only plan case:
// The order of operators: 7 unionall <- 6 select <- 5 assign?
// <- 4 unnest-map (PIdx) <- 3 split <- 2 unnest-map (SIdx) <-
...
// We do this to skip the primary index-search since we are
looking for a secondary index-search here.
do {
- dataSourceOp = (ILogicalOperator)
dataSourceOp.getInputs().get(0).getValue();
+ dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
} while (dataSourceOp.getOperatorTag() !=
LogicalOperatorTag.SPLIT && dataSourceOp.hasInputs());
if (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT)
{
@@ -2415,7 +2409,7 @@
}
do {
- dataSourceOp = (ILogicalOperator)
dataSourceOp.getInputs().get(0).getValue();
+ dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
} while (dataSourceOp.getOperatorTag() !=
LogicalOperatorTag.UNNEST_MAP
&& dataSourceOp.getOperatorTag() !=
LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP
&& dataSourceOp.hasInputs());
@@ -2532,9 +2526,10 @@
* false otherwise.
*/
public static boolean getNoIndexOnlyOption(IOptimizationContext context) {
- Map<String, String> config = context.getMetadataProvider().getConfig();
+ Map<String, Object> config = context.getMetadataProvider().getConfig();
if
(config.containsKey(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION))
{
- return
Boolean.parseBoolean(config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION));
+ return Boolean
+ .parseBoolean((String)
config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION));
}
return
AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION_DEFAULT_VALUE;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 1a88170..5b961f0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -217,7 +217,7 @@
generateLogicalPlan(plan, output.config().getPlanFormat());
}
CompilerProperties compilerProperties =
metadataProvider.getApplicationContext().getCompilerProperties();
- Map<String, String> querySpecificConfig =
validateConfig(metadataProvider.getConfig(), sourceLoc);
+ Map<String, Object> querySpecificConfig =
validateConfig(metadataProvider.getConfig(), sourceLoc);
final PhysicalOptimizationConfig physOptConf =
getPhysicalOptimizationConfig(compilerProperties,
querySpecificConfig, sourceLoc);
@@ -235,7 +235,7 @@
builder.setMissableTypeComputer(MissableTypeComputer.INSTANCE);
builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE);
- int parallelism =
getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
+ int parallelism = getParallelism((String)
querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
compilerProperties.getParallelism());
AlgebricksAbsolutePartitionConstraint computationLocations =
chooseLocations(clusterInfoCollector, parallelism,
metadataProvider.getClusterLocations());
@@ -308,19 +308,19 @@
}
protected PhysicalOptimizationConfig
getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
- Map<String, String> querySpecificConfig, SourceLocation sourceLoc)
throws AlgebricksException {
+ Map<String, Object> querySpecificConfig, SourceLocation sourceLoc)
throws AlgebricksException {
int frameSize = compilerProperties.getFrameSize();
int sortFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
+ (String)
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
compilerProperties.getSortMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_SORT, sourceLoc);
int groupFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
+ (String)
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
compilerProperties.getGroupMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_GROUP_BY, sourceLoc);
int joinFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
+ (String)
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
compilerProperties.getJoinMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc);
int textSearchFrameLimit =
getFrameLimit(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY,
-
querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY),
+ (String)
querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY),
compilerProperties.getTextSearchMemorySize(), frameSize,
MIN_FRAME_LIMIT_FOR_TEXTSEARCH, sourceLoc);
final PhysicalOptimizationConfig physOptConf =
OptimizationConfUtil.getPhysicalOptimizationConfig();
physOptConf.setFrameSize(frameSize);
@@ -482,7 +482,7 @@
}
// Validates if the query contains unsupported query parameters.
- private static Map<String, String> validateConfig(Map<String, String>
config, SourceLocation sourceLoc)
+ private static Map<String, Object> validateConfig(Map<String, Object>
config, SourceLocation sourceLoc)
throws AlgebricksException {
for (String parameterName : config.keySet()) {
if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 99ed42d..d4a52fb 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -82,7 +82,8 @@
// Note: The current implementation of the wait for completion flag is
problematic due to locking issues:
// Locks obtained during the start of the feed are not released, and
so, the feed can't be stopped
// and also, read locks over dataverses, datasets, etc, are never
released.
- boolean wait =
Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
+ boolean wait =
+ Boolean.parseBoolean((String)
metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
if (wait) {
IActiveEntityEventSubscriber stoppedSubscriber =
new WaitForStateSubscriber(this,
EnumSet.of(ActivityState.STOPPED));
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index e9087c2..bb80406 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -83,7 +83,7 @@
String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
- String policyName =
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+ String policyName = (String)
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse,
policyName);
if (policy == null) {
policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
@@ -93,7 +93,7 @@
}
}
ArrayList<LogicalVariable> feedDataScanOutputVariables = new
ArrayList<>();
- String csLocations =
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+ String csLocations = (String)
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
List<LogicalVariable> pkVars = new ArrayList<>();
FeedDataSource ds = createFeedDataSource(asid, targetDataset,
sourceFeedName, subscriptionLocation,
metadataProvider, policy, outputType, csLocations,
unnest.getVariable(), context, pkVars);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index c1ccda7..b07a30e 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -45,6 +45,7 @@
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
@@ -684,6 +685,9 @@
int[] keyIndexes, List<Integer> keyIndicators,
StorageComponentProvider storageComponentProvider,
IFrameOperationCallbackFactory frameOpCallbackFactory, boolean
hasSecondaries) throws Exception {
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+ MetadataProvider mdProvider = new MetadataProvider(
+ (ICcApplicationContext)
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(),
+ MetadataBuiltinEntities.DEFAULT_DATAVERSE);
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory,
Map<String, String>> mergePolicy =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -700,8 +704,9 @@
new LSMPrimaryUpsertOperatorNodePushable(ctx,
ctx.getTaskAttemptId().getTaskId().getPartition(),
indexHelperFactory,
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
recordDescProvider.getInputRecordDescriptor(new
ActivityId(new OperatorDescriptorId(0), 0), 0),
- modificationCallbackFactory, searchCallbackFactory,
keyIndexes.length, recordType, -1,
- frameOpCallbackFactory == null ?
dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+ modificationCallbackFactory, searchCallbackFactory,
+ keyIndexes.length, recordType, -1,
frameOpCallbackFactory == null
+ ?
dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory,
MissingWriterFactory.INSTANCE, hasSecondaries);
RecordDescriptor upsertOutRecDesc =
getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
filterFields == null ? 0 : filterFields.length, recordType,
metaType);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 830307b..e4e300a 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -127,7 +127,7 @@
MetadataProvider metadataProvider = mock(MetadataProvider.class);
@SuppressWarnings("unchecked")
- Map<String, String> config = mock(Map.class);
+ Map<String, Object> config = mock(Map.class);
when(metadataProvider.getDefaultDataverseName()).thenReturn(dvName);
when(metadataProvider.getConfig()).thenReturn(config);
when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true");
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 8f28752..1d7ad2d 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -50,5 +50,10 @@
public void close() throws IOException {
// No Op
}
+
+ @Override
+ public void fail(Throwable th) {
+ // No Op
+ }
}
}
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
index 1f564a5..6752f77 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
@@ -102,7 +102,7 @@
if (expression == null) {
return functionDecls;
}
- String value =
metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS);
+ String value = (String)
metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS);
boolean includePrivateFunctions = (value != null) ?
Boolean.valueOf(value.toLowerCase()) : false;
Set<CallExpr> functionCalls =
functionCollector.getFunctionCalls(expression);
for (CallExpr functionCall : functionCalls) {
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
index 09a9f90..ecd8c8e 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
@@ -168,7 +168,7 @@
}
protected void inlineWithExpressions() throws CompilationException {
- String inlineWith = metadataProvider.getConfig().get(INLINE_WITH);
+ String inlineWith = (String)
metadataProvider.getConfig().get(INLINE_WITH);
if (inlineWith != null &&
inlineWith.equalsIgnoreCase(NOT_INLINE_WITH)) {
return;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7f8d31d..38fd2f28 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -148,7 +148,7 @@
private final StorageProperties storageProperties;
private final IFunctionManager functionManager;
private final LockList locks;
- private final Map<String, String> config;
+ private final Map<String, Object> config;
private Dataverse defaultDataverse;
private MetadataTransactionContext mdTxnCtx;
@@ -173,8 +173,13 @@
config = new HashMap<>();
}
- public String getPropertyValue(String propertyName) {
- return config.get(propertyName);
+ @SuppressWarnings("unchecked")
+ public <T> T getProperty(String name) {
+ return (T) config.get(name);
+ }
+
+ public void setProperty(String name, Object value) {
+ config.put(name, value);
}
public void disableBlockingOperator() {
@@ -186,7 +191,7 @@
}
@Override
- public Map<String, String> getConfig() {
+ public Map<String, Object> getConfig() {
return config;
}
@@ -296,7 +301,7 @@
*/
public ARecordType findOutputRecordType() throws AlgebricksException {
return MetadataManagerUtil.findOutputRecordType(mdTxnCtx,
getDefaultDataverseName(),
- getPropertyValue("output-record-type"));
+ getProperty("output-record-type"));
}
public Dataset findDataset(String dataverse, String dataset) throws
AlgebricksException {
@@ -1545,6 +1550,7 @@
}
}
+ @Override
public AsterixTupleFilterFactory
createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr,
JobGenContext context)
throws AlgebricksException {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index d0a22b7..8471d45 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -682,7 +682,7 @@
datasetPartitions, isSink);
}
- public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+ public IFrameOperationCallbackFactory
getFrameOpCallbackFactory(MetadataProvider mdProvider) {
return NoOpFrameOperationCallbackFactory.INSTANCE;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6d81145..bbdfadf 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -447,7 +447,7 @@
RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes,
outputTypeTraits);
op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc,
fieldPermutation, idfh,
missingWriterFactory, modificationCallbackFactory,
searchCallbackFactory,
- dataset.getFrameOpCallbackFactory(), numKeys, itemType,
fieldIdx, hasSecondaries);
+ dataset.getFrameOpCallbackFactory(metadataProvider), numKeys,
itemType, fieldIdx, hasSecondaries);
return new Pair<>(op, splitsAndConstraint.second);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index dba6760..3df1b13 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -256,13 +256,18 @@
@Override
public void frameCompleted() throws HyracksDataException {
- callback.frameCompleted();
appender.write(writer, true);
+ callback.frameCompleted();
}
@Override
public void close() throws IOException {
callback.close();
+ }
+
+ @Override
+ public void fail(Throwable th) {
+ callback.fail(th);
}
};
} catch (Throwable e) { // NOSONAR: Re-thrown
@@ -305,7 +310,12 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+ try {
+ lsmAccessor.batchOperate(accessor, tuple, processor,
frameOpCallback);
+ } catch (Throwable th) {// NOSONAR: Must notify of all failures
+ frameOpCallback.fail(th);
+ throw th;
+ }
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index be227ec..c0d18df 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.transaction.management.service.logging;
-import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
-
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -705,7 +703,7 @@
}
} catch (Exception e) {
LOGGER.log(Level.ERROR, "LogFlusher is terminating abnormally.
System is in unusable state; halting", e);
- ExitUtil.halt(EC_IMMEDIATE_HALT);
+ ExitUtil.halt(ExitUtil.EC_TXN_LOG_FLUSHER_FAILURE);
throw new AssertionError("not reachable");
} finally {
if (interrupted) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index ae66ee2..efa9c1c 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -212,5 +212,6 @@
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr,
JobGenContext context)
throws AlgebricksException;
- public Map<String, String> getConfig();
+ public Map<String, Object> getConfig();
+
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 066b6c1..9ca4bdb 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -580,7 +580,7 @@
}
case UNORDERED_PARTITIONED: {
List<LogicalVariable> varList = new
ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
- String hashMergeHint =
context.getMetadataProvider().getConfig().get(HASH_MERGE);
+ String hashMergeHint = (String)
context.getMetadataProvider().getConfig().get(HASH_MERGE);
if (hashMergeHint == null ||
!hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
pop = new HashPartitionExchangePOperator(varList,
domain);
break;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 627e972..77623c2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -73,7 +73,7 @@
}
if (registrationException != null) {
LOGGER.fatal("Registering with {} failed with exception", this,
registrationException);
- ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
+ ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
}
return getCcId();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 040fe03..cde26d5 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.ipc.impl;
-import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -344,7 +342,7 @@
}
} catch (Exception e) {
LOGGER.fatal("Unrecoverable networking failure; Halting...",
e);
- ExitUtil.halt(EC_IMMEDIATE_HALT);
+ ExitUtil.halt(ExitUtil.EC_NETWORK_FAILURE);
}
return false;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index df78c53..78e5862 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -33,4 +33,11 @@
* @throws HyracksDataException
*/
void frameCompleted() throws HyracksDataException;
+
+ /**
+ * Called when the task has failed.
+ *
+ * @param th
+ */
+ void fail(Throwable th);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 14cfc59..652cb34 100644
---
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -40,6 +40,9 @@
public static final int EC_INCONSISTENT_METADATA = 8;
public static final int EC_UNCAUGHT_THROWABLE = 9;
public static final int EC_UNHANDLED_EXCEPTION = 11;
+ public static final int EC_TXN_LOG_FLUSHER_FAILURE = 12;
+ public static final int EC_NODE_REGISTRATION_FAILURE = 13;
+ public static final int EC_NETWORK_FAILURE = 14;
public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
--
To view, visit https://asterix-gerrit.ics.uci.edu/2795
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>