>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458 )
Change subject: [ASTERIXDB-3144][RT] Make commit runtime support multiple partitions ...................................................................... [ASTERIXDB-3144][RT] Make commit runtime support multiple partitions - user model changes: no - storage format changes: no - interface changes: no Details: This patch changes the commit runtime to support operating on multiple partitions. With this change, a commit push runtime will handle processing data belonging to multiple partitions. This is a step towards achieving compute/storage separation. Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java 5 files changed, 56 insertions(+), 12 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved; Verified diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index a65c898..068c0ee 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -855,7 +855,7 @@ pkFieldsInCommitOp[i] = start++; } CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp, - true, ctx.getTaskAttemptId().getTaskId().getPartition(), true); + true, ctx.getTaskAttemptId().getTaskId().getPartition(), true, null, null); insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc); commitOp.setInputRecordDescriptor(0, upsertOutRecDesc); return Pair.of(insertOp, commitOp); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java index f87757c..f684405 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java @@ -58,11 +58,12 @@ public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, int[] keyFieldPermutation, boolean isSink) throws AlgebricksException { return new IPushRuntimeFactory() { + @Override public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new IPushRuntime[] { - new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()), getDatasetId(), - keyFieldPermutation, true, ctx.getTaskAttemptId().getTaskId().getPartition(), true) }; + return new IPushRuntime[] { new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()), + getDatasetId(), keyFieldPermutation, true, ctx.getTaskAttemptId().getTaskId().getPartition(), + true, null, null) }; } }; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index c0f2ddd..4b225ec 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -99,11 +99,13 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; @@ -631,8 +633,11 @@ public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException { int[] datasetPartitions = getDatasetPartitions(metadataProvider); + IBinaryHashFunctionFactory[] pkHashFunFactories = getPrimaryHashFunctionFactories(metadataProvider); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation, + pkHashFunFactories, datasetPartitions.length); return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, metadataProvider.isWriteTransaction(), - datasetPartitions, isSink); + datasetPartitions, isSink, partitionerFactory); } public IFrameOperationCallbackFactory getFrameOpCallbackFactory(MetadataProvider mdProvider) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java index 2692cc7..ff5ed5c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java @@ -34,6 +34,8 @@ import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitioner; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.HyracksConstants; @@ -56,14 +58,18 @@ protected final boolean isWriteTransaction; protected final long[] longHashes; protected final IHyracksTaskContext ctx; + private final int[] datasetPartitions; + private final ITuplePartitioner partitioner; protected final int resourcePartition; protected ITransactionContext transactionContext; protected LogRecord logRecord; protected final boolean isSink; public CommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, int[] primaryKeyFields, - boolean isWriteTransaction, int resourcePartition, boolean isSink) { + boolean isWriteTransaction, int resourcePartition, boolean isSink, + ITuplePartitionerFactory partitionerFactory, int[] datasetPartitions) { this.ctx = ctx; + this.datasetPartitions = datasetPartitions; INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); this.transactionManager = appCtx.getTransactionSubsystem().getTransactionManager(); @@ -75,6 +81,7 @@ this.isWriteTransaction = isWriteTransaction; this.resourcePartition = resourcePartition; this.isSink = isSink; + this.partitioner = partitionerFactory != null ? partitionerFactory.createPartitioner(ctx) : null; longHashes = new long[2]; } @@ -102,7 +109,7 @@ for (int t = 0; t < nTuple; t++) { tRef.reset(tAccess, t); try { - formLogRecord(buffer, t); + formLogRecord(tAccess, t); logMgr.log(logRecord); if (!isSink) { appendTupleToFrame(t); @@ -130,10 +137,11 @@ TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker); } - protected void formLogRecord(ByteBuffer buffer, int t) { + protected void formLogRecord(FrameTupleAccessor accessor, int t) throws HyracksDataException { int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields); + int resource = getResourcePartition(accessor, t); TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef, - primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT); + primaryKeyFields, resource, LogType.ENTITY_COMMIT); } protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) { @@ -141,6 +149,10 @@ return Math.abs((int) longHashes[0]); } + protected int getResourcePartition(FrameTupleAccessor tupleAccessor, int tuple) throws HyracksDataException { + return partitioner != null ? datasetPartitions[partitioner.partition(tupleAccessor, tuple)] : resourcePartition; + } + @Override public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { this.inputRecordDesc = recordDescriptor; diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 708e8dc..269f686 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -23,13 +23,15 @@ import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IJobletEventListenerFactory; public class CommitRuntimeFactory extends AbstractPushRuntimeFactory { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; + protected final ITuplePartitionerFactory partitionerFactory; protected final int datasetId; protected final int[] primaryKeyFields; protected final boolean isWriteTransaction; @@ -37,7 +39,8 @@ protected final boolean isSink; public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction, - int[] datasetPartitions, boolean isSink) { + int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory partitionerFactory) { + this.partitionerFactory = partitionerFactory; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.isWriteTransaction = isWriteTransaction; @@ -55,6 +58,7 @@ IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); return new IPushRuntime[] { new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId, primaryKeyFields, isWriteTransaction, - datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) }; + datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink, partitionerFactory, + datasetPartitions) }; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71 Gerrit-Change-Number: 17458 Gerrit-PatchSet: 4 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
