>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18425 )
Change subject: WIP: primary keys function
......................................................................
WIP: primary keys function
Change-Id: Idb1182a85c1ccff698af81cdbd3c527b5418f2e4
---
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysEvaluator.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/PrimaryKeysTypeComputer.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysDescriptor.java
5 files changed, 282 insertions(+), 0 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/25/18425/1
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 21b8b60..001b1c4 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -118,6 +118,7 @@
import org.apache.asterix.om.typecomputer.impl.OrderedListOfAPointTypeComputer;
import
org.apache.asterix.om.typecomputer.impl.OrderedListOfAStringTypeComputer;
import org.apache.asterix.om.typecomputer.impl.OrderedListOfAnyTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.PrimaryKeysTypeComputer;
import org.apache.asterix.om.typecomputer.impl.PropagateTypeComputer;
import org.apache.asterix.om.typecomputer.impl.PutAutogeneratedKeyTypeComputer;
import org.apache.asterix.om.typecomputer.impl.RecordAddFieldsTypeComputer;
@@ -1287,6 +1288,8 @@
public static final FunctionIdentifier PUT_AUTOGENERATED_KEY =
FunctionConstants.newAsterix("put-autogenerated-key",
FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier PRIMARY_KEYS =
+ FunctionConstants.newAsterix("primary-keys",
FunctionIdentifier.VARARGS);
static {
// first, take care of Algebricks builtin functions
@@ -2146,6 +2149,7 @@
// used by UPSERT/INSERT for collections with autogenerated uuid
addPrivateFunction(PUT_AUTOGENERATED_KEY,
PutAutogeneratedKeyTypeComputer.INSTANCE, false);
+ addPrivateFunction(PRIMARY_KEYS, PrimaryKeysTypeComputer.INSTANCE,
false);
}
static {
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/PrimaryKeysTypeComputer.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/PrimaryKeysTypeComputer.java
new file mode 100644
index 0000000..5107095
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/PrimaryKeysTypeComputer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.typecomputer.impl;
+
+import java.util.List;
+
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+public class PrimaryKeysTypeComputer implements IResultTypeComputer {
+
+ public static final PrimaryKeysTypeComputer INSTANCE = new
PrimaryKeysTypeComputer();
+
+ private PrimaryKeysTypeComputer() {
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression,
IVariableTypeEnvironment env,
+ IMetadataProvider<?, ?> metadataProvider) throws
AlgebricksException {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression)
expression;
+ List<Mutable<ILogicalExpression>> funArgs = f.getArguments();
+ IAType inRecArg = (IAType) env.getType(funArgs.get(0).getValue());
+ if (!RecordUtil.FULLY_OPEN_RECORD_TYPE.equals(inRecArg)) {
+ throw new RuntimeException("Input record should be fully open");
+ }
+ return TypeCastUtils.getRequiredType(f);
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysDescriptor.java
new file mode 100644
index 0000000..e9645a8
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.om.types.ARecordType;
+import
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PrimaryKeysDescriptor extends
AbstractScalarFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new
IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new PrimaryKeysDescriptor();
+ }
+
+ @Override
+ public IFunctionTypeInferer createFunctionTypeInferer() {
+ return FunctionTypeInferers.SET_EXPRESSION_TYPE;
+ }
+ };
+
+ private ARecordType outRecType;
+
+ @Override
+ public void setImmutableStates(Object... states) {
+ outRecType = (ARecordType) states[0];
+ }
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final
IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(final
IEvaluatorContext ctx) throws HyracksDataException {
+ return new PrimaryKeysEvaluator(ctx, args, outRecType,
sourceLoc, getIdentifier());
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.PRIMARY_KEYS;
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysEvaluator.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysEvaluator.java
new file mode 100644
index 0000000..51753af
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/PrimaryKeysEvaluator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
+import org.apache.asterix.runtime.evaluators.functions.AbstractScalarEval;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class PrimaryKeysEvaluator extends AbstractScalarEval {
+
+ private final RecordLazyVisitablePointable lazyRecord = new
RecordLazyVisitablePointable(true);
+ private final IPointable inRecPointable = new VoidPointable();
+ private final IPointable pkFieldName = new VoidPointable();
+ private final ArrayBackedValueStorage tempStorage = new
ArrayBackedValueStorage();
+ private final IBinaryComparator stringBinaryComparator =
+
UTF8StringBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ private final RuntimeRecordTypeInfo runtimeRecordTypeInfo = new
RuntimeRecordTypeInfo();
+ private final ArrayBackedValueStorage resultStorage = new
ArrayBackedValueStorage();
+ private final DataOutput out = resultStorage.getDataOutput();
+ private final RecordBuilder recordBuilder = new RecordBuilder();
+ private final ARecordType outRecType;
+ private final IScalarEvaluator inRecEval;
+ private final IScalarEvaluator pkFieldEval;
+ private final ATypeTag pkOutType;
+
+ PrimaryKeysEvaluator(IEvaluatorContext ctx, IScalarEvaluatorFactory[]
args, ARecordType outRecType,
+ SourceLocation sourceLocation, FunctionIdentifier identifier)
throws HyracksDataException {
+ super(sourceLocation, identifier);
+ this.outRecType = outRecType;
+ pkOutType = outRecType.getFieldTypes()[0].getTypeTag();
+ inRecEval = args[0].createScalarEvaluator(ctx);
+ pkFieldEval = args[1].createScalarEvaluator(ctx);
+ recordBuilder.reset(outRecType);
+ runtimeRecordTypeInfo.reset(outRecType);
+ }
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
+ pkFieldEval.evaluate(tuple, pkFieldName);
+ inRecEval.evaluate(tuple, inRecPointable);
+ if (PointableHelper.checkAndSetMissingOrNull(result, inRecPointable)) {
+ return;
+ }
+ ATypeTag inputTypeTag = PointableHelper.getTypeTag(inRecPointable);
+ if (inputTypeTag != ATypeTag.OBJECT) {
+ throw new RuntimeDataException(ErrorCode.CASTING_FIELD, srcLoc,
inputTypeTag, ATypeTag.OBJECT);
+ }
+ lazyRecord.set(inRecPointable);
+ resultStorage.reset();
+ processTuple();
+ result.set(resultStorage);
+ }
+
+ private void processTuple() throws HyracksDataException {
+ try {
+ processRecord(lazyRecord);
+ recordBuilder.write(out, true);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void processRecord(RecordLazyVisitablePointable record) throws
IOException {
+ recordBuilder.init();
+ int openFieldCount = record.getNumberOfChildren();
+ for (int i = 0; i < openFieldCount; i++) {
+ record.nextChild();
+ IValueReference fieldValue = record.getChildValue();
+ IValueReference fieldName = record.getFieldName();
+ if (PointableHelper.isEqual(fieldName, pkFieldName,
stringBinaryComparator)) {
+ addKeyPart(fieldName, fieldValue);
+ } else {
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+ }
+ }
+
+ private void addKeyPart(IValueReference keyPartName, IValueReference
keyPartVal) throws IOException {
+ ATypeTag inputTypeTag = PointableHelper.getTypeTag(keyPartVal);
+ int pos =
runtimeRecordTypeInfo.getFieldIndex(keyPartName.getByteArray(),
keyPartName.getStartOffset() + 1,
+ keyPartName.getLength() - 1);
+ if (!needPromote(inputTypeTag, pkOutType)) {
+ recordBuilder.addField(pos, keyPartVal);
+ } else {
+ try {
+ tempStorage.reset();
+
ATypeHierarchy.convertNumericTypeByteArray(keyPartVal.getByteArray(),
keyPartVal.getStartOffset(),
+ keyPartVal.getLength(), pkOutType,
tempStorage.getDataOutput(), false);
+ recordBuilder.addField(pos, tempStorage);
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RuntimeDataException(ErrorCode.TYPE_CONVERT, srcLoc,
inputTypeTag, pkOutType);
+ }
+ }
+ }
+
+ private boolean needPromote(ATypeTag tag0, ATypeTag tag1) {
+ return tag0 != tag1;
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 214c8a0..0beec91 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -542,6 +542,7 @@
import
org.apache.asterix.runtime.evaluators.functions.records.GetRecordFieldValueDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.GetRecordFieldsDescriptor;
import org.apache.asterix.runtime.evaluators.functions.records.PairsDescriptor;
+import
org.apache.asterix.runtime.evaluators.functions.records.PrimaryKeysDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.PutAutogeneratedKeyDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordAddDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordAddFieldsDescriptor;
@@ -1341,6 +1342,7 @@
fc.add(RandomWithSeedDescriptor.FACTORY);
fc.add(SerializedSizeDescriptor.FACTORY);
fc.add(PutAutogeneratedKeyDescriptor.FACTORY);
+ fc.add(PrimaryKeysDescriptor.FACTORY);
ServiceLoader.load(IFunctionRegistrant.class).iterator().forEachRemaining(c ->
c.register(fc));
return fc;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18425
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Idb1182a85c1ccff698af81cdbd3c527b5418f2e4
Gerrit-Change-Number: 18425
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange