Till Westmann has submitted this change and it was merged. Change subject: Fix Upsert Pipeline ......................................................................
Fix Upsert Pipeline Change-Id: I5c19d448f9664ecaeac600668a6dbdcf40673c56 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1308 Reviewed-by: Ian Maxon <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java 11 files changed, 370 insertions(+), 222 deletions(-) Approvals: Ian Maxon: Looks good to me, but someone else must approve Till Westmann: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java index 304eb0c..64cca8c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java @@ -43,11 +43,6 @@ this.fta = fta; } - public void prettyPrint(ByteBuffer frame) { - fta.reset(frame); - fta.prettyPrint(); - } - @Override public ByteBuffer handle(HyracksDataException th, ByteBuffer frame) { try { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java index e05aa25..f6eeb66 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java @@ -26,10 +26,10 @@ import java.util.Map; import org.apache.asterix.common.config.AsterixStorageProperties; -import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.context.TransactionSubsystemProvider; @@ -42,8 +42,8 @@ import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; +import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; @@ -1097,7 +1097,7 @@ @Override public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime( - IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, + IDataSource<AqlSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys, List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException { @@ -1119,22 +1119,22 @@ int i = 0; // set the keys' permutations for (LogicalVariable varKey : primaryKeys) { - int idx = propagatedSchema.findVariable(varKey); + int idx = inputSchema.findVariable(varKey); fieldPermutation[i] = idx; bloomFilterKeyFields[i] = i; i++; } // set the record permutation - fieldPermutation[i++] = propagatedSchema.findVariable(payload); + fieldPermutation[i++] = inputSchema.findVariable(payload); // set the filters' permutations. if (numFilterFields > 0) { - int idx = propagatedSchema.findVariable(filterKeys.get(0)); + int idx = inputSchema.findVariable(filterKeys.get(0)); fieldPermutation[i++] = idx; } if (additionalNonFilterFields != null) { for (LogicalVariable var : additionalNonFilterFields) { - int idx = propagatedSchema.findVariable(var); + int idx = inputSchema.findVariable(var); fieldPermutation[i++] = idx; } } @@ -1195,22 +1195,19 @@ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; - for (int j = 0; j < recordDesc.getFieldCount(); j++) { - outputTypeTraits[j] = recordDesc.getTypeTraits()[j]; - outputSerDes[j] = recordDesc.getFields()[j]; - } - outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils - .getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType); - outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils - .getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType); + // add the previous record first + int f = 0; + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType); + f++; + // add the previous meta second if (dataset.hasMetaPart()) { - outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat() - .getSerdeProvider().getSerializerDeserializer(metaItemType); - outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat() - .getTypeTraitProvider().getTypeTrait(metaItemType); + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer( + metaItemType); + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType); + f++; } - + // add the previous filter third int fieldIdx = -1; if (numFilterFields > 0) { String filterField = DatasetUtils.getFilterField(dataset).get(0); @@ -1220,10 +1217,15 @@ } } fieldIdx = i; - outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider() - .getTypeTrait(itemType.getFieldTypes()[fieldIdx]); - outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider() + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType + .getFieldTypes()[fieldIdx]); + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]); + f++; + } + for (int j = 0; j < recordDesc.getFieldCount(); j++) { + outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j]; + outputSerDes[j + f] = recordDesc.getFields()[j]; } RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java index afd6019..96f9e76 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java @@ -78,6 +78,8 @@ private ARecordPointable recPointable; private DataOutput prevDos; private final boolean hasMeta; + private final int filterFieldIndex; + private final int metaFieldIndex; public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys, @@ -93,6 +95,8 @@ key.setFieldPermutation(searchKeyPermutations); hasMeta = (fieldPermutation.length > numOfPrimaryKeys + 1) && (filterFieldIndex < 0 || (filterFieldIndex >= 0 && (fieldPermutation.length > numOfPrimaryKeys + 2))); + this.metaFieldIndex = numOfPrimaryKeys + 1; + this.filterFieldIndex = numOfPrimaryKeys + (hasMeta ? 2 : 1); if (filterFieldIndex >= 0) { isFiltered = true; this.recordType = recordType; @@ -101,7 +105,6 @@ this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0)); this.prevDos = prevRecWithPKWithFilterValue.getDataOutput(); } - } // we have the permutation which has [pk locations, record location, optional:filter-location] @@ -141,8 +144,8 @@ .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this)); cursor = indexAccessor.createSearchCursor(false); frameTuple = new FrameTupleReference(); - IAsterixAppRuntimeContext runtimeCtx = - (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); + IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext() + .getApplicationContext().getApplicationObject(); AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, runtimeCtx.getTransactionSubsystem().getLogManager()); } catch (Exception e) { @@ -156,40 +159,11 @@ searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp); } - private void writeOutput(int tupleIndex, boolean recordWasInserted) throws IOException { - boolean recordWasDeleted = prevTuple != null; - tb.reset(); + private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException { frameTuple.reset(accessor, tupleIndex); for (int i = 0; i < frameTuple.getFieldCount(); i++) { dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i)); tb.addFieldEndOffset(); - } - if (recordWasDeleted) { - dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys), - prevTuple.getFieldLength(numOfPrimaryKeys)); - tb.addFieldEndOffset(); - // if has meta, then append meta - if (hasMeta) { - dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1), - prevTuple.getFieldLength(numOfPrimaryKeys + 1)); - tb.addFieldEndOffset(); - } - // if with filters, append the filter - if (isFiltered) { - dos.write(prevTuple.getFieldData(numOfPrimaryKeys + (hasMeta ? 2 : 1)), - prevTuple.getFieldStart(numOfPrimaryKeys + (hasMeta ? 2 : 1)), - prevTuple.getFieldLength(numOfPrimaryKeys + (hasMeta ? 2 : 1))); - tb.addFieldEndOffset(); - } - } else { - addNullField(); - if (hasMeta) { - addNullField(); - } - // if with filters, append null - if (isFiltered) { - addNullField(); - } } if (recordWasInserted || recordWasDeleted) { FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); @@ -214,6 +188,7 @@ int i = 0; try { while (i < tupleCount) { + tb.reset(); boolean recordWasInserted = false; tuple.reset(accessor, i); resetSearchPredicate(i); @@ -222,10 +197,26 @@ cursor.next(); prevTuple = cursor.getTuple(); cursor.reset(); - modCallback.setOp(Operation.DELETE); if (isFiltered) { prevTuple = getPrevTupleWithFilter(prevTuple); } + dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys), + prevTuple.getFieldLength(numOfPrimaryKeys)); + tb.addFieldEndOffset(); + // if has meta, then append meta + if (hasMeta) { + dos.write(prevTuple.getFieldData(metaFieldIndex), prevTuple.getFieldStart(metaFieldIndex), + prevTuple.getFieldLength(metaFieldIndex)); + tb.addFieldEndOffset(); + } + // if with filters, append the filter + if (isFiltered) { + dos.write(prevTuple.getFieldData(filterFieldIndex), + prevTuple.getFieldStart(filterFieldIndex), + prevTuple.getFieldLength(filterFieldIndex)); + tb.addFieldEndOffset(); + } + modCallback.setOp(Operation.DELETE); if (i == 0) { lsmAccessor.delete(prevTuple); } else { @@ -233,6 +224,14 @@ } } else { prevTuple = null; + addNullField(); + if (hasMeta) { + addNullField(); + } + // if with filters, append null + if (isFiltered) { + addNullField(); + } cursor.reset(); } if (!isNull(tuple, numOfPrimaryKeys)) { @@ -244,7 +243,7 @@ } recordWasInserted = true; } - writeOutput(i, recordWasInserted); + writeOutput(i, recordWasInserted, prevTuple != null); i++; } appender.write(writer, true); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java index 68710a5..6947942 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java @@ -193,7 +193,7 @@ public IFunctionInfo lookupFunction(FunctionIdentifier fid); public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource, - IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys, + IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys, LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields, List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java index 5dc327a..9838c12 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java @@ -84,7 +84,6 @@ @Override public void recomputeSchema() throws AlgebricksException { schema = new ArrayList<LogicalVariable>(); - schema.addAll(inputs.get(0).getValue().getSchema()); if (operation == Kind.UPSERT) { // The upsert case also produces the previous record schema.add(prevRecordVar); @@ -95,6 +94,7 @@ schema.add(prevFilterVar); } } + schema.addAll(inputs.get(0).getValue().getSchema()); } public void getProducedVariables(Collection<LogicalVariable> producedVariables) { @@ -146,7 +146,6 @@ @Override public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) throws AlgebricksException { - target.addAllVariables(sources[0]); if (operation == Kind.UPSERT) { target.addVariable(prevRecordVar); if (prevAdditionalNonFilteringVars != null) { @@ -158,6 +157,7 @@ target.addVariable(prevFilterVar); } } + target.addAllVariables(sources[0]); } }; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java index 28f4e5e..59ccd84 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java @@ -34,11 +34,11 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -295,7 +295,12 @@ @Override public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException { - standardLayout(op); + // produced first + VariableUtilities.getProducedVariables(op, schemaVariables); + // then propagated + for (Mutable<ILogicalOperator> c : op.getInputs()) { + VariableUtilities.getLiveVariables(c.getValue(), schemaVariables); + } return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java index 3c9cddf..6ded4a3 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java @@ -114,7 +114,7 @@ runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload, additionalFilteringKeys, inputDesc, context, spec); } else if (operation == Kind.UPSERT) { - runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload, + runtimeAndConstraints = mp.getUpsertRuntime(dataSource, inputSchemas[0], typeEnv, keys, payload, additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec); } else { throw new AlgebricksException("Unsupported Operation " + operation); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java index d99e2f2..cefada7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java @@ -18,18 +18,12 @@ */ package org.apache.hyracks.dataflow.common.comm.io; -import java.io.DataInputStream; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import org.apache.hyracks.api.comm.FrameConstants; import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.util.IntSerDeUtils; /** @@ -118,132 +112,8 @@ return getFieldCount() * FrameConstants.SIZE_LEN; } - public void prettyPrint(String prefix) { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - int tc = getTupleCount(); - StringBuilder sb = new StringBuilder(); - sb.append(prefix).append("TC: " + tc).append("\n"); - for (int i = 0; i < tc; ++i) { - prettyPrint(i, bbis, dis, sb); - } - System.err.println(sb.toString()); - } - - public void prettyPrint() { - prettyPrint(""); - } - - protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) { - sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")["); - for (int j = 0; j < getFieldCount(); ++j) { - sb.append(" "); - if (j > 0) { - sb.append("|"); - } - sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") "); - sb.append("{"); - bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j)); - try { - sb.append(recordDescriptor.getFields()[j].deserialize(dis)); - } catch (Exception e) { - e.printStackTrace(); - sb.append("Failed to deserialize field" + j); - } - sb.append("}"); - } - sb.append("\n"); - } - - public void prettyPrint(int tid) { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - StringBuilder sb = new StringBuilder(); - prettyPrint(tid, bbis, dis, sb); - System.err.println(sb.toString()); - } - @Override public int getFieldCount() { return recordDescriptor.getFieldCount(); - } - - /* - * The two methods below can be used for debugging. - * They are safe as they don't print records. Printing records - * using IserializerDeserializer can print incorrect results or throw exceptions. - * A better way yet would be to use record pointable. - */ - public void prettyPrint(String prefix, int[] recordFields) throws IOException { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - int tc = getTupleCount(); - StringBuilder sb = new StringBuilder(); - sb.append(prefix).append("TC: " + tc).append("\n"); - for (int i = 0; i < tc; ++i) { - prettyPrint(i, bbis, dis, sb, recordFields); - } - System.err.println(sb.toString()); - } - - public void prettyPrint(int tIdx, int[] recordFields) throws IOException { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - StringBuilder sb = new StringBuilder(); - prettyPrint(tIdx, bbis, dis, sb, recordFields); - System.err.println(sb.toString()); - } - - public void prettyPrint(ITupleReference tuple, int fieldsIdx, int descIdx) throws HyracksDataException { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - StringBuilder sb = new StringBuilder(); - sb.append("["); - sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", " - + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") "); - sb.append("{"); - ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx)); - bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx)); - sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis)); - sb.append("}"); - sb.append("\n"); - System.err.println(sb.toString()); - } - - public void prettyPrint(ITupleReference tuple, int[] descF) throws HyracksDataException { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - StringBuilder sb = new StringBuilder(); - sb.append("["); - for (int j = 0; j < descF.length; ++j) { - sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", " - + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") "); - sb.append("{"); - ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j)); - bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j)); - sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis)); - sb.append("}"); - } - sb.append("\n"); - System.err.println(sb.toString()); - } - - protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb, - int[] recordFields) throws IOException { - Arrays.sort(recordFields); - sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")["); - for (int j = 0; j < getFieldCount(); ++j) { - sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") "); - sb.append("{"); - bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j)); - if (Arrays.binarySearch(recordFields, j) >= 0) { - sb.append("{a record field: only print using pointable:"); - sb.append("tag->" + dis.readByte() + "}"); - } else { - sb.append(recordDescriptor.getFields()[j].deserialize(dis)); - } - sb.append("}"); - } - sb.append("\n"); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java index d05b3ed..874ac46 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java @@ -18,24 +18,10 @@ */ package org.apache.hyracks.dataflow.common.comm.io; -import java.io.DataInputStream; - -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; - public class ResultFrameTupleAccessor extends FrameTupleAccessor { public ResultFrameTupleAccessor() { super(null); - } - - @Override - protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) { - sb.append(tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")["); - - bbis.setByteBuffer(getBuffer(), getTupleStartOffset(tid)); - sb.append(dis); - - sb.append("]\n"); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java new file mode 100644 index 0000000..aa27c42 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.util; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +/** + * A Util class used for inspecting frames + * for debugging purposes + */ +public class FrameDebugUtils { + private FrameDebugUtils() { + } + + /** + * Debugging method + * @param fta + * @param recordDescriptor + * @param prefix + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix) { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + int tc = fta.getTupleCount(); + StringBuilder sb = new StringBuilder(); + sb.append(prefix).append("TC: " + tc).append("\n"); + for (int i = 0; i < tc; ++i) { + prettyPrint(fta, recordDescriptor, i, bbis, dis, sb); + } + System.err.println(sb.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Debugging method + * @param fta + * @param recordDescriptor + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor) { + prettyPrint(fta, recordDescriptor, ""); + } + + /** + * Debugging method + * @param fta + * @param operator + */ + public void prettyPrintTags(IFrameTupleAccessor fta, String operator) { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + int tc = fta.getTupleCount(); + StringBuilder sb = new StringBuilder(); + sb.append(operator + ":"); + sb.append("TC: " + tc).append("\n"); + for (int i = 0; i < tc; ++i) { + prettyPrintTag(fta, i, bbis, dis, sb); + } + System.err.println(sb.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Debugging method + * @param fta + * @param tid + * @param bbis + * @param dis + * @param sb + */ + protected void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream bbis, DataInputStream dis, + StringBuilder sb) { + sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")["); + for (int j = 0; j < fta.getFieldCount(); ++j) { + sb.append(" "); + if (j > 0) { + sb.append("|"); + } + sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") "); + sb.append("{"); + sb.append(Byte.toString(fta.getBuffer().array()[fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + + fta.getFieldStartOffset(tid, j)])); + sb.append("}"); + } + sb.append("\n"); + } + + /** + * Debugging method + * @param fta + * @param recordDescriptor + * @param tid + * @param bbis + * @param dis + * @param sb + */ + protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid, + ByteBufferInputStream bbis, DataInputStream dis, + StringBuilder sb) { + sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")["); + for (int j = 0; j < fta.getFieldCount(); ++j) { + sb.append(" "); + if (j > 0) { + sb.append("|"); + } + sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") "); + sb.append("{"); + bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta + .getFieldStartOffset(tid, j)); + try { + sb.append(recordDescriptor.getFields()[j].deserialize(dis)); + } catch (Exception e) { + e.printStackTrace(); + sb.append("Failed to deserialize field" + j); + } + sb.append("}"); + } + sb.append("\n"); + } + + + /** + * Debugging method + * @param fta + * @param recordDescriptor + * @param tid + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid) { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + StringBuilder sb = new StringBuilder(); + prettyPrint(fta, recordDescriptor, tid, bbis, dis, sb); + System.err.println(sb.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Debugging method + * They are safe as they don't print records. Printing records + * using IserializerDeserializer can print incorrect results or throw exceptions. + * A better way yet would be to use record pointable. + * @param fta + * @param recordDescriptor + * @param prefix + * @param recordFields + * @throws IOException + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix, + int[] recordFields) throws IOException { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + int tc = fta.getTupleCount(); + StringBuilder sb = new StringBuilder(); + sb.append(prefix).append("TC: " + tc).append("\n"); + for (int i = 0; i < tc; ++i) { + prettyPrint(fta, recordDescriptor, i, bbis, dis, sb, recordFields); + } + System.err.println(sb.toString()); + } + } + + /** + * Debugging method + * @param fta + * @param recordDescriptor + * @param tIdx + * @param recordFields + * @throws IOException + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tIdx, int[] recordFields) + throws IOException { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + StringBuilder sb = new StringBuilder(); + prettyPrint(fta, recordDescriptor, tIdx, bbis, dis, sb, recordFields); + System.err.println(sb.toString()); + } + } + + /** + * Debugging method + * @param tuple + * @param fieldsIdx + * @param descIdx + * @throws HyracksDataException + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple, + int fieldsIdx, int descIdx) + throws HyracksDataException { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", " + + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") "); + sb.append("{"); + ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx)); + bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx)); + sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis)); + sb.append("}"); + sb.append("\n"); + System.err.println(sb.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Debugging method + * @param tuple + * @param descF + * @throws HyracksDataException + */ + public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple, + int[] descF) throws HyracksDataException { + try (ByteBufferInputStream bbis = new ByteBufferInputStream(); + DataInputStream dis = new DataInputStream(bbis)) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int j = 0; j < descF.length; ++j) { + sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", " + + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") "); + sb.append("{"); + ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j)); + bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j)); + sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis)); + sb.append("}"); + } + sb.append("\n"); + System.err.println(sb.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Debugging method + * @param fta + * @param recordDescriptor + * @param tid + * @param bbis + * @param dis + * @param sb + * @param recordFields + * @throws IOException + */ + protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid, + ByteBufferInputStream bbis, DataInputStream dis, + StringBuilder sb, + int[] recordFields) throws IOException { + Arrays.sort(recordFields); + sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")["); + for (int j = 0; j < fta.getFieldCount(); ++j) { + sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") "); + sb.append("{"); + bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta + .getFieldSlotsLength() + fta.getFieldStartOffset(tid, j)); + if (Arrays.binarySearch(recordFields, j) >= 0) { + sb.append("{a record field: only print using pointable:"); + sb.append("tag->" + dis.readByte() + "}"); + } else { + sb.append(recordDescriptor.getFields()[j].deserialize(dis)); + } + sb.append("}"); + } + sb.append("\n"); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java index 567b7df..c68d59d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java @@ -31,7 +31,6 @@ import static org.junit.Assert.assertTrue; import java.io.DataInputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -39,8 +38,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; - -import junit.extensions.PA; import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrame; @@ -61,6 +58,8 @@ import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader; import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame; import org.junit.Test; + +import junit.extensions.PA; public class RunMergingFrameReaderTest { static IBinaryComparator[] Comparators = new IBinaryComparator[] { ComparatorFactories[0].createBinaryComparator(), @@ -123,12 +122,6 @@ } // printFrame(frame.getBuffer()); return true; - } - - private void printFrame(ByteBuffer buffer) { - FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc); - fta.reset(buffer); - fta.prettyPrint(); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/1308 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I5c19d448f9664ecaeac600668a6dbdcf40673c56 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
