Luo Chen has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2825
Change subject: [ASTERIXDB-2429] Fix the upsert of primary key index ...................................................................... [ASTERIXDB-2429] Fix the upsert of primary key index - user model changes: no - storage format changes: no - interface changes: no Details: - Previously the primary key index is not properly maintained during upsert. Since there is no secondary key in the primary key index, the old value would always point to the primary key, which is always equal to the new value. As a result, the primary key index is nevered maintainined during upsert. - This patch fixes this bug with two changes: First, if there is a primary key index, we would perform upsert anyway no matter whether old value == new value Second, use a boolean variable to indicate whether the operation is upsert or delete since for the primary key index, old value cannot provide such information. Change-Id: I925bd42ba67f70e94f5f5bc2d24151c8e2e20baf --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java 22 files changed, 309 insertions(+), 34 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/25/2825/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java index ae74832..e123715 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java @@ -496,6 +496,12 @@ indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates)); } } + + if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) { + indexUpdate.setUpsertIndicatorExpr(new MutableObject<>( + new VariableReferenceExpression(primaryIndexModificationOp.getUpsertIndicatorVar()))); + } + context.computeAndSetTypeEnvironmentForOperator(indexUpdate); if (!primaryIndexModificationOp.isBulkload() || secondaryIndexTotalCnt == 1) { currentTop = indexUpdate; diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index aa4fb75..2c1e0f7 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -92,6 +92,7 @@ import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.functions.FunctionInfo; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; @@ -522,6 +523,8 @@ // A change feed, we don't need the assign to access PKs upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false); + upsertOp.setUpsertIndicatorVar(context.newVar()); + upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN); // Create and add a new variable used for representing the original record upsertOp.setPrevRecordVar(context.newVar()); upsertOp.setPrevRecordType(targetDatasource.getItemType()); @@ -556,6 +559,8 @@ upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); upsertOp.getInputs().add(new MutableObject<>(assign)); upsertOp.setSourceLocation(sourceLoc); + upsertOp.setUpsertIndicatorVar(context.newVar()); + upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN); // Create and add a new variable used for representing the original record ARecordType recordType = (ARecordType) targetDatasource.getItemType(); upsertOp.setPrevRecordVar(context.newVar()); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java index f9f57a3..c312bdf 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java @@ -34,7 +34,7 @@ */ @RunWith(Parameterized.class) public class SqlppExecutionTest { - protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; + protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-lsm.conf"; @BeforeClass public static void setUp() throws Exception { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp new file mode 100644 index 0000000..332aa0d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a change feed with meta-data, create a primary key index and then ingest data (with deletes) + * Expected Res : Success + * Date : 18th Jun 2018 + */ + +drop dataverse KeyVerse if exists; +create dataverse KeyVerse; +use KeyVerse; + +create type DocumentType as open{ +}; + +create type KVMetaType as open{ +`key`:string, +vbucket:int32, +seq:int64, +cas:int64, +expiration:int32, +flags:int32, +revSeq:int64, +lockTime:int32 +}; + +create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta().`key`; +create primary index primary_idx on KVStore; + +create feed KVChangeStream with { + "adapter-name" : "adapter", + "type-name" : "DocumentType", + "meta-type-name" : "KVMetaType", + "reader" : "org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory", + "parser" : "record-with-metadata", + "format" : "dcp", + "record-format" : "json", + "change-feed" : "true", + "key-indexes" : "0", + "key-indicators" : "1", + "num-of-records" : "1000", + "delete-cycle" : "5" +}; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp new file mode 100644 index 0000000..03a83ff --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use KeyVerse; + +set `wait-for-completion-feed` "true"; +connect feed KVChangeStream to dataset KVStore; + +start feed KVChangeStream; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp new file mode 100644 index 0000000..6e29992 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use KeyVerse; + +select count(*) +from KVStore x; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp new file mode 100644 index 0000000..89469bd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +drop dataverse KeyVerse; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp new file mode 100644 index 0000000..3f08b2f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Upsert into a dataset which has a b-tree secondary index + * Expected Res : Success + * Date : Sep 15th 2015 + */ + +use test; + +select count(*) +from UpsertTo x; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm new file mode 100644 index 0000000..4c59618 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm @@ -0,0 +1 @@ +{ "$1": 801 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm new file mode 100644 index 0000000..71c9709 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm @@ -0,0 +1 @@ +{ "$1": 9 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index ada210e..6d3b4db 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -8865,6 +8865,11 @@ </compilation-unit> </test-case> <test-case FilePath="feeds"> + <compilation-unit name="change-feed-with-meta-pk-index"> + <output-dir compare="Text">change-feed-with-meta-pk-index</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="feeds"> <compilation-unit name="change-feed-with-meta-with-mixed-index"> <output-dir compare="Text">change-feed-with-meta-with-mixed-index</output-dir> </compilation-unit> diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index f3f5c56..f72d34b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -57,6 +57,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedConstants; import org.apache.asterix.formats.base.IDataFormat; +import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; @@ -668,7 +669,7 @@ boolean bulkload) throws AlgebricksException { return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, - context, spec, bulkload, null, null); + context, spec, bulkload, null, null, null); } @Override @@ -680,7 +681,7 @@ throws AlgebricksException { return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, - context, spec, false, null, null); + context, spec, false, null, null, null); } @Override @@ -688,12 +689,12 @@ IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys, - ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys, + ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException { return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc, - context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey); + context, spec, false, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey); } @Override @@ -1042,8 +1043,8 @@ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload, - List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey) - throws AlgebricksException { + LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys, + LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException { String indexName = dataSourceIndex.getId(); String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName(); String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName(); @@ -1062,18 +1063,19 @@ case BTREE: return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, - bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); + bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys); case RTREE: return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, - bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); + bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys); case SINGLE_PARTITION_WORD_INVIX: case SINGLE_PARTITION_NGRAM_INVIX: case LENGTH_PARTITIONED_WORD_INVIX: case LENGTH_PARTITIONED_NGRAM_INVIX: return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, - secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys); + secondaryIndex.getIndexType(), bulkload, upsertIndicatorVar, prevSecondaryKeys, + prevAdditionalFilteringKeys); default: throw new AlgebricksException( indexOp.name() + "Insert, upsert, and delete not implemented for index type: " @@ -1085,8 +1087,9 @@ String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory, RecordDescriptor inputRecordDesc, JobGenContext context, - JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys, - List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException { + JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar, + List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys) + throws AlgebricksException { Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); int numKeys = primaryKeys.size() + secondaryKeys.size(); int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; @@ -1153,8 +1156,10 @@ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); } else if (indexOp == IndexOperation.UPSERT) { + int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, - filterFactory, modificationCallbackFactory, prevFieldPermutation); + filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex, + BinaryBooleanInspector.FACTORY, prevFieldPermutation); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, filterFactory, false, modificationCallbackFactory); @@ -1169,8 +1174,9 @@ String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context, - JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys, - List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException { + JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar, + List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys) + throws AlgebricksException { Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); String itemTypeName = dataset.getItemTypeName(); IAType itemType = MetadataManager.INSTANCE @@ -1250,8 +1256,10 @@ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); } else if (indexOp == IndexOperation.UPSERT) { + int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, - indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation); + indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex, + BinaryBooleanInspector.FACTORY, prevFieldPermutation); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory); @@ -1264,8 +1272,8 @@ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload, - List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys) - throws AlgebricksException { + LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys, + List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException { // Check the index is length-partitioned or not. boolean isPartitioned; if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX @@ -1359,8 +1367,10 @@ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); } else if (indexOp == IndexOperation.UPSERT) { + int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory, - filterFactory, modificationCallbackFactory, prevFieldPermutation); + filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex, + BinaryBooleanInspector.FACTORY, prevFieldPermutation); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, indexDataFlowFactory, filterFactory, false, modificationCallbackFactory); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index bbdfadf..28c612f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -407,15 +407,20 @@ IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); LSMPrimaryUpsertOperatorDescriptor op; - ITypeTraits[] outputTypeTraits = - new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; - ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + 1 + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; IDataFormat dataFormat = metadataProvider.getDataFormat(); - // add the previous record first int f = 0; + // add the upsert indicator var + outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN); + outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN); + f++; + // add the previous record outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(itemType); + outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType); f++; // add the previous meta second if (dataset.hasMetaPart()) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 3df1b13..4f8c9fb 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.ILogMarkerCallback; import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback; +import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; @@ -158,11 +159,13 @@ if (cursor.hasNext()) { cursor.next(); prevTuple = cursor.getTuple(); + appendUpsertIndicator(!isDelete); appendFilterToPrevTuple(); appendPrevRecord(); appendPreviousMeta(); appendFilterToOutput(); } else { + appendUpsertIndicator(!isDelete); appendPreviousTupleAsMissing(); } } finally { @@ -170,6 +173,7 @@ } } else { searchCallback.before(key); // lock + appendUpsertIndicator(!isDelete); appendPreviousTupleAsMissing(); } if (isDelete && prevTuple != null) { @@ -330,6 +334,14 @@ } } + private void appendUpsertIndicator(boolean isUpsert) throws IOException { + if (!isUpsert) { + System.out.println(); + } + recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : ABoolean.FALSE, dos); + tb.addFieldEndOffset(); + } + private void appendPrevRecord() throws IOException { dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys), prevTuple.getFieldLength(numOfPrimaryKeys)); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java index 958288a..df658b6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java @@ -19,6 +19,7 @@ package org.apache.asterix.runtime.operators; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; +import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -34,14 +35,19 @@ private static final long serialVersionUID = 1L; private final int[] prevValuePermutation; + private final int upsertIndiatorFieldIndex; + private final IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory; public LSMSecondaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory, + int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory, int[] prevValuePermutation) { super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, tupleFilterFactory, false, modificationOpCallbackFactory); this.prevValuePermutation = prevValuePermutation; + this.upsertIndiatorFieldIndex = upsertIndicatorFieldIndex; + this.upsertIndicatorInspectorFactory = upsertIndicatorInspectorFactory; } @Override @@ -49,6 +55,7 @@ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, modCallbackFactory, - tupleFilterFactory, fieldPermutation, intputRecDesc, prevValuePermutation); + tupleFilterFactory, fieldPermutation, intputRecDesc, upsertIndiatorFieldIndex, + upsertIndicatorInspectorFactory, prevValuePermutation); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java index b928131..c6bff2c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java @@ -19,15 +19,19 @@ package org.apache.asterix.runtime.operators; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.TypeTagUtil; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; +import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector; +import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.utils.TupleUtils; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; @@ -56,17 +60,26 @@ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable { private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference(); + private final int upsertIndicatorFieldIndex; + private final IBinaryBooleanInspector upsertIndicatorInspector; private final int numberOfFields; private AbstractIndexModificationOperationCallback abstractModCallback; + private final FrameTupleReference frameTuple = new FrameTupleReference(); + private final boolean isPrimaryKeyIndex; public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory, ITupleFilterFactory tupleFilterFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc, + int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory, int[] prevValuePermutation) throws HyracksDataException { super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT, modCallbackFactory, tupleFilterFactory); this.prevValueTuple.setFieldPermutation(prevValuePermutation); + this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex; + this.upsertIndicatorInspector = upsertIndicatorInspectorFactory.createBinaryBooleanInspector(ctx); this.numberOfFields = prevValuePermutation.length; + // a primary key index only has primary keys, and thus these two permutations are the same + this.isPrimaryKeyIndex = Arrays.equals(fieldPermutation, prevValuePermutation); } @Override @@ -82,9 +95,15 @@ int tupleCount = accessor.getTupleCount(); for (int i = 0; i < tupleCount; i++) { try { + frameTuple.reset(accessor, i); + boolean isUpsert = + upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex), + frameTuple.getFieldStart(upsertIndicatorFieldIndex), + frameTuple.getFieldLength(upsertIndicatorFieldIndex)); // if both previous value and new value are null, then we skip tuple.reset(accessor, i); prevValueTuple.reset(accessor, i); + boolean isNewValueMissing = isMissing(tuple, 0); boolean isOldValueMissing = isMissing(prevValueTuple, 0); if (isNewValueMissing && isOldValueMissing) { @@ -92,8 +111,10 @@ continue; } // At least, one is not null - // If they are equal, then we skip - if (TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) { + if (!isPrimaryKeyIndex && TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) { + // For a secondary index, if the secondary key values do not change, we can skip upserting it. + // However, for a primary key index, we cannot do this because it only contains primary keys + // which are always the same continue; } if (!isOldValueMissing) { @@ -101,7 +122,7 @@ abstractModCallback.setOp(Operation.DELETE); lsmAccessor.forceDelete(prevValueTuple); } - if (!isNewValueMissing) { + if (isUpsert && !isNewValueMissing) { // we need to insert the new value abstractModCallback.setOp(Operation.INSERT); lsmAccessor.forceInsert(tuple); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java index efa9c1c..3d004a2 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java @@ -205,8 +205,9 @@ IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr, - List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys, - RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException; + LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys, + LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context, + JobSpecification spec) throws AlgebricksException; public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context) diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java index 31a1294..827fcc0 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java @@ -49,6 +49,7 @@ // used for upsert operations private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs; private Mutable<ILogicalExpression> prevAdditionalFilteringExpression; + private Mutable<ILogicalExpression> upsertIndicatorExpr; private final int numberOfAdditionalNonFilteringFields; public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex, @@ -189,4 +190,12 @@ public int getNumberOfAdditionalNonFilteringFields() { return numberOfAdditionalNonFilteringFields; } + + public Mutable<ILogicalExpression> getUpsertIndicatorExpr() { + return upsertIndicatorExpr; + } + + public void setUpsertIndicatorExpr(Mutable<ILogicalExpression> upsertIndicatorExpr) { + this.upsertIndicatorExpr = upsertIndicatorExpr; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java index 9838c12..ae90462 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java @@ -59,6 +59,9 @@ // previous additional fields (for UPSERT) private List<LogicalVariable> prevAdditionalNonFilteringVars; private List<Object> prevAdditionalNonFilteringTypes; + // a boolean variable that indicates whether it's a delete operation (false) or upsert operation (true) + private LogicalVariable upsertIndicatorVar; + private Object upsertIndicatorVarType; public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr, List<Mutable<ILogicalExpression>> primaryKeyExprs, @@ -85,6 +88,7 @@ public void recomputeSchema() throws AlgebricksException { schema = new ArrayList<LogicalVariable>(); if (operation == Kind.UPSERT) { + schema.add(upsertIndicatorVar); // The upsert case also produces the previous record schema.add(prevRecordVar); if (additionalNonFilteringExpressions != null) { @@ -98,6 +102,9 @@ } public void getProducedVariables(Collection<LogicalVariable> producedVariables) { + if (upsertIndicatorVar != null) { + producedVariables.add(upsertIndicatorVar); + } if (prevRecordVar != null) { producedVariables.add(prevRecordVar); } @@ -147,6 +154,7 @@ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) throws AlgebricksException { if (operation == Kind.UPSERT) { + target.addVariable(upsertIndicatorVar); target.addVariable(prevRecordVar); if (prevAdditionalNonFilteringVars != null) { for (LogicalVariable var : prevAdditionalNonFilteringVars) { @@ -171,6 +179,7 @@ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx); if (operation == Kind.UPSERT) { + env.setVarType(upsertIndicatorVar, upsertIndicatorVarType); env.setVarType(prevRecordVar, prevRecordType); if (prevAdditionalNonFilteringVars != null) { for (int i = 0; i < prevAdditionalNonFilteringVars.size(); i++) { @@ -224,6 +233,22 @@ this.prevRecordVar = prevRecordVar; } + public LogicalVariable getUpsertIndicatorVar() { + return upsertIndicatorVar; + } + + public void setUpsertIndicatorVar(LogicalVariable upsertIndicatorVar) { + this.upsertIndicatorVar = upsertIndicatorVar; + } + + public Object getUpsertIndicatorVarType() { + return upsertIndicatorVarType; + } + + public void setUpsertIndicatorVarType(Object upsertIndicatorVarType) { + this.upsertIndicatorVarType = upsertIndicatorVarType; + } + public void setPrevRecordType(Object recordType) { prevRecordType = recordType; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index d57a998..e66809e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -421,6 +421,9 @@ e.getValue().getUsedVariables(usedVariables); } } + if (op.getUpsertIndicatorExpr() != null) { + op.getUpsertIndicatorExpr().getValue().getUsedVariables(usedVariables); + } return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java index 92fa86d..228ca52 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java @@ -54,14 +54,16 @@ private final ILogicalExpression filterExpr; private final IDataSourceIndex<?, ?> dataSourceIndex; private final List<LogicalVariable> additionalFilteringKeys; + private final LogicalVariable upsertIndicatorVar; private final List<LogicalVariable> prevSecondaryKeys; private final LogicalVariable prevAdditionalFilteringKey; private final int numOfAdditionalNonFilteringFields; public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr, - IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys, - LogicalVariable prevAdditionalFilteringKey, int numOfAdditionalNonFilteringFields) { + IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable upsertIndicatorVar, + List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey, + int numOfAdditionalNonFilteringFields) { this.primaryKeys = primaryKeys; this.secondaryKeys = secondaryKeys; if (filterExpr != null) { @@ -71,6 +73,7 @@ } this.dataSourceIndex = dataSourceIndex; this.additionalFilteringKeys = additionalFilteringKeys; + this.upsertIndicatorVar = upsertIndicatorVar; this.prevSecondaryKeys = prevSecondaryKeys; this.prevAdditionalFilteringKey = prevAdditionalFilteringKey; this.numOfAdditionalNonFilteringFields = numOfAdditionalNonFilteringFields; @@ -132,8 +135,8 @@ break; case UPSERT: runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, - typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys, - prevAdditionalFilteringKey, inputDesc, context, spec); + typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, upsertIndicatorVar, + prevSecondaryKeys, prevAdditionalFilteringKey, inputDesc, context, spec); break; default: throw new AlgebricksException("Unsupported Operation " + operation); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index d277043..fdedece 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -356,9 +356,13 @@ new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex())); } else { + LogicalVariable upsertIndicatorVar = null; List<LogicalVariable> prevSecondaryKeys = null; LogicalVariable prevAdditionalFilteringKey = null; if (opInsDel.getOperation() == Kind.UPSERT) { + upsertIndicatorVar = + ((VariableReferenceExpression) opInsDel.getUpsertIndicatorExpr().getValue()) + .getVariableReference(); prevSecondaryKeys = new ArrayList<LogicalVariable>(); getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys); if (opInsDel.getPrevAdditionalFilteringExpression() != null) { @@ -369,7 +373,7 @@ } op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), - prevSecondaryKeys, prevAdditionalFilteringKey, + upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey, opInsDel.getNumberOfAdditionalNonFilteringFields())); } break; -- To view, visit https://asterix-gerrit.ics.uci.edu/2825 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I925bd42ba67f70e94f5f5bc2d24151c8e2e20baf Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <cl...@uci.edu>