abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1308
Change subject: Fix Upsert Pipeline
......................................................................
Fix Upsert Pipeline
Change-Id: I5c19d448f9664ecaeac600668a6dbdcf40673c56
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
7 files changed, 96 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/08/1308/1
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index e05aa25..619ef94 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -26,10 +26,10 @@
import java.util.Map;
import org.apache.asterix.common.config.AsterixStorageProperties;
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.context.ITransactionSubsystemProvider;
import org.apache.asterix.common.context.TransactionSubsystemProvider;
@@ -42,8 +42,8 @@
import
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
import
org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -1097,7 +1097,7 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getUpsertRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema
propagatedSchema, IVariableTypeEnvironment typeEnv,
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema inputSchema,
IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys, LogicalVariable payload,
List<LogicalVariable> filterKeys,
List<LogicalVariable> additionalNonFilterFields, RecordDescriptor
recordDesc, JobGenContext context,
JobSpecification spec) throws AlgebricksException {
@@ -1119,22 +1119,22 @@
int i = 0;
// set the keys' permutations
for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
+ int idx = inputSchema.findVariable(varKey);
fieldPermutation[i] = idx;
bloomFilterKeyFields[i] = i;
i++;
}
// set the record permutation
- fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+ fieldPermutation[i++] = inputSchema.findVariable(payload);
// set the filters' permutations.
if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(filterKeys.get(0));
+ int idx = inputSchema.findVariable(filterKeys.get(0));
fieldPermutation[i++] = idx;
}
if (additionalNonFilterFields != null) {
for (LogicalVariable var : additionalNonFilterFields) {
- int idx = propagatedSchema.findVariable(var);
+ int idx = inputSchema.findVariable(var);
fieldPermutation[i++] = idx;
}
}
@@ -1195,22 +1195,23 @@
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
ISerializerDeserializer[] outputSerDes = new
ISerializerDeserializer[recordDesc.getFieldCount()
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- for (int j = 0; j < recordDesc.getFieldCount(); j++) {
- outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
- outputSerDes[j] = recordDesc.getFields()[j];
- }
- outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1)
- numFilterFields] = FormatUtils
+ // add the previous record first
+ int f = 0;
+ outputSerDes[f] = FormatUtils
.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
- outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart()
? 2 : 1) - numFilterFields] = FormatUtils
+ outputTypeTraits[f] = FormatUtils
.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
-
+ f++;
+ // add the previous meta second
if (dataset.hasMetaPart()) {
- outputSerDes[outputSerDes.length - 1 - numFilterFields] =
FormatUtils.getDefaultFormat()
+
+ outputSerDes[f] = FormatUtils.getDefaultFormat()
.getSerdeProvider().getSerializerDeserializer(metaItemType);
- outputTypeTraits[outputTypeTraits.length - 1 -
numFilterFields] = FormatUtils.getDefaultFormat()
+ outputTypeTraits[f] = FormatUtils.getDefaultFormat()
.getTypeTraitProvider().getTypeTrait(metaItemType);
+ f++;
}
-
+ // add the previous filter third
int fieldIdx = -1;
if (numFilterFields > 0) {
String filterField =
DatasetUtils.getFilterField(dataset).get(0);
@@ -1220,12 +1221,17 @@
}
}
fieldIdx = i;
- outputTypeTraits[outputTypeTraits.length - 1] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
- outputSerDes[outputSerDes.length - 1] =
FormatUtils.getDefaultFormat().getSerdeProvider()
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider()
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ f++;
}
+ for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j+f] = recordDesc.getTypeTraits()[j];
+ outputSerDes[j+f] = recordDesc.getFields()[j];
+ }
RecordDescriptor outputRecordDesc = new
RecordDescriptor(outputSerDes, outputTypeTraits);
op = new AsterixLSMTreeUpsertOperatorDescriptor(spec,
outputRecordDesc,
appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(),
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index afd6019..65b3188 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -141,8 +141,8 @@
.createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAsterixAppRuntimeContext runtimeCtx =
- (IAsterixAppRuntimeContext)
ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext)
ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
runtimeCtx.getTransactionSubsystem().getLogManager());
} catch (Exception e) {
@@ -156,40 +156,11 @@
searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
}
- private void writeOutput(int tupleIndex, boolean recordWasInserted) throws
IOException {
- boolean recordWasDeleted = prevTuple != null;
- tb.reset();
+ private void writeOutput(int tupleIndex, boolean recordWasInserted,
boolean recordWasDeleted) throws IOException {
frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
tb.addFieldEndOffset();
- }
- if (recordWasDeleted) {
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys),
prevTuple.getFieldStart(numOfPrimaryKeys),
- prevTuple.getFieldLength(numOfPrimaryKeys));
- tb.addFieldEndOffset();
- // if has meta, then append meta
- if (hasMeta) {
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1),
prevTuple.getFieldStart(numOfPrimaryKeys + 1),
- prevTuple.getFieldLength(numOfPrimaryKeys + 1));
- tb.addFieldEndOffset();
- }
- // if with filters, append the filter
- if (isFiltered) {
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys + (hasMeta ?
2 : 1)),
- prevTuple.getFieldStart(numOfPrimaryKeys + (hasMeta ?
2 : 1)),
- prevTuple.getFieldLength(numOfPrimaryKeys + (hasMeta ?
2 : 1)));
- tb.addFieldEndOffset();
- }
- } else {
- addNullField();
- if (hasMeta) {
- addNullField();
- }
- // if with filters, append null
- if (isFiltered) {
- addNullField();
- }
}
if (recordWasInserted || recordWasDeleted) {
FrameUtils.appendToWriter(writer, appender,
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
@@ -214,6 +185,7 @@
int i = 0;
try {
while (i < tupleCount) {
+ tb.reset();
boolean recordWasInserted = false;
tuple.reset(accessor, i);
resetSearchPredicate(i);
@@ -222,10 +194,27 @@
cursor.next();
prevTuple = cursor.getTuple();
cursor.reset();
- modCallback.setOp(Operation.DELETE);
if (isFiltered) {
prevTuple = getPrevTupleWithFilter(prevTuple);
}
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys),
prevTuple.getFieldStart(numOfPrimaryKeys),
+ prevTuple.getFieldLength(numOfPrimaryKeys));
+ tb.addFieldEndOffset();
+ // if has meta, then append meta
+ if (hasMeta) {
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys +
1), prevTuple.getFieldStart(numOfPrimaryKeys
+ + 1),
+ prevTuple.getFieldLength(numOfPrimaryKeys +
1));
+ tb.addFieldEndOffset();
+ }
+ // if with filters, append the filter
+ if (isFiltered) {
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys +
(hasMeta ? 2 : 1)),
+ prevTuple.getFieldStart(numOfPrimaryKeys +
(hasMeta ? 2 : 1)),
+ prevTuple.getFieldLength(numOfPrimaryKeys +
(hasMeta ? 2 : 1)));
+ tb.addFieldEndOffset();
+ }
+ modCallback.setOp(Operation.DELETE);
if (i == 0) {
lsmAccessor.delete(prevTuple);
} else {
@@ -233,6 +222,14 @@
}
} else {
prevTuple = null;
+ addNullField();
+ if (hasMeta) {
+ addNullField();
+ }
+ // if with filters, append null
+ if (isFiltered) {
+ addNullField();
+ }
cursor.reset();
}
if (!isNull(tuple, numOfPrimaryKeys)) {
@@ -244,7 +241,7 @@
}
recordWasInserted = true;
}
- writeOutput(i, recordWasInserted);
+ writeOutput(i, recordWasInserted, prevTuple != null);
i++;
}
appender.write(writer, true);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 68710a5..6947942 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -193,7 +193,7 @@
public IFunctionInfo lookupFunction(FunctionIdentifier fid);
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getUpsertRuntime(IDataSource<S> dataSource,
- IOperatorSchema propagatedSchema, IVariableTypeEnvironment
typeEnv, List<LogicalVariable> keys,
+ IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable>
additionalFilterFields,
List<LogicalVariable> additionalNonFilteringFields,
RecordDescriptor recordDesc, JobGenContext context,
JobSpecification jobSpec) throws AlgebricksException;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 5dc327a..9838c12 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -84,7 +84,6 @@
@Override
public void recomputeSchema() throws AlgebricksException {
schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
if (operation == Kind.UPSERT) {
// The upsert case also produces the previous record
schema.add(prevRecordVar);
@@ -95,6 +94,7 @@
schema.add(prevFilterVar);
}
}
+ schema.addAll(inputs.get(0).getValue().getSchema());
}
public void getProducedVariables(Collection<LogicalVariable>
producedVariables) {
@@ -146,7 +146,6 @@
@Override
public void propagateVariables(IOperatorSchema target,
IOperatorSchema... sources)
throws AlgebricksException {
- target.addAllVariables(sources[0]);
if (operation == Kind.UPSERT) {
target.addVariable(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
@@ -158,6 +157,7 @@
target.addVariable(prevFilterVar);
}
}
+ target.addAllVariables(sources[0]);
}
};
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 28f4e5e..59ccd84 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -34,11 +34,11 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -295,7 +295,12 @@
@Override
public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op,
Void arg) throws AlgebricksException {
- standardLayout(op);
+ // produced first
+ VariableUtilities.getProducedVariables(op, schemaVariables);
+ // then propagated
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ VariableUtilities.getLiveVariables(c.getValue(), schemaVariables);
+ }
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 3c9cddf..6ded4a3 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -114,7 +114,7 @@
runtimeAndConstraints = mp.getDeleteRuntime(dataSource,
propagatedSchema, typeEnv, keys, payload,
additionalFilteringKeys, inputDesc, context, spec);
} else if (operation == Kind.UPSERT) {
- runtimeAndConstraints = mp.getUpsertRuntime(dataSource,
propagatedSchema, typeEnv, keys, payload,
+ runtimeAndConstraints = mp.getUpsertRuntime(dataSource,
inputSchemas[0], typeEnv, keys, payload,
additionalFilteringKeys, additionalNonFilteringFields,
inputDesc, context, spec);
} else {
throw new AlgebricksException("Unsupported Operation " +
operation);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index d99e2f2..5dfa78a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -134,6 +134,35 @@
prettyPrint("");
}
+ public void prettyPrintTags(String operator) {
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis);
+ int tc = getTupleCount();
+ StringBuilder sb = new StringBuilder();
+ sb.append(operator + ":");
+ sb.append("TC: " + tc).append("\n");
+ for (int i = 0; i < tc; ++i) {
+ prettyPrintTag(i, bbis, dis, sb);
+ }
+ System.err.println(sb.toString());
+ }
+
+ protected void prettyPrintTag(int tid, ByteBufferInputStream bbis,
DataInputStream dis, StringBuilder sb) {
+ sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " +
getTupleEndOffset(tid) + ")[");
+ for (int j = 0; j < getFieldCount(); ++j) {
+ sb.append(" ");
+ if (j > 0) {
+ sb.append("|");
+ }
+ sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " +
getFieldEndOffset(tid, j) + ") ");
+ sb.append("{");
+ sb.append(Byte.toString(buffer.array()[getTupleStartOffset(tid) +
getFieldSlotsLength()
+ + getFieldStartOffset(tid, j)]));
+ sb.append("}");
+ }
+ sb.append("\n");
+ }
+
protected void prettyPrint(int tid, ByteBufferInputStream bbis,
DataInputStream dis, StringBuilder sb) {
sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " +
getTupleEndOffset(tid) + ")[");
for (int j = 0; j < getFieldCount(); ++j) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1308
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I5c19d448f9664ecaeac600668a6dbdcf40673c56
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>