>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458 )

Change subject: [ASTERIXDB-3144][RT] Make commit runtime support multiple 
partitions
......................................................................

[ASTERIXDB-3144][RT] Make commit runtime support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch changes the commit runtime to support
operating on multiple partitions. With this change, a commit push
runtime will handle processing data belonging to multiple partitions.
This is a step towards achieving compute/storage separation.

Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
5 files changed, 56 insertions(+), 12 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved; Verified




diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a65c898..068c0ee 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -855,7 +855,7 @@
             pkFieldsInCommitOp[i] = start++;
         }
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), 
dataset.getDatasetId(), pkFieldsInCommitOp,
-                true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
+                true, ctx.getTaskAttemptId().getTaskId().getPartition(), true, 
null, null);
         insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
         commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
         return Pair.of(insertOp, commitOp);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index f87757c..f684405 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -58,11 +58,12 @@
     public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider 
metadataProvider, int[] keyFieldPermutation,
             boolean isSink) throws AlgebricksException {
         return new IPushRuntimeFactory() {
+
             @Override
             public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) 
throws HyracksDataException {
-                return new IPushRuntime[] {
-                        new CommitRuntime(ctx, new 
TxnId(ctx.getJobletContext().getJobId().getId()), getDatasetId(),
-                                keyFieldPermutation, true, 
ctx.getTaskAttemptId().getTaskId().getPartition(), true) };
+                return new IPushRuntime[] { new CommitRuntime(ctx, new 
TxnId(ctx.getJobletContext().getJobId().getId()),
+                        getDatasetId(), keyFieldPermutation, true, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        true, null, null) };
             }
         };
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index c0f2ddd..4b225ec 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -99,11 +99,13 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -631,8 +633,11 @@
     public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider 
metadataProvider,
             int[] primaryKeyFieldPermutation, boolean isSink) throws 
AlgebricksException {
         int[] datasetPartitions = getDatasetPartitions(metadataProvider);
+        IBinaryHashFunctionFactory[] pkHashFunFactories = 
getPrimaryHashFunctionFactories(metadataProvider);
+        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                pkHashFunFactories, datasetPartitions.length);
         return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, 
metadataProvider.isWriteTransaction(),
-                datasetPartitions, isSink);
+                datasetPartitions, isSink, partitionerFactory);
     }

     public IFrameOperationCallbackFactory 
getFrameOpCallbackFactory(MetadataProvider mdProvider) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 2692cc7..ff5ed5c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -34,6 +34,8 @@
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -56,14 +58,18 @@
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
     protected final IHyracksTaskContext ctx;
+    private final int[] datasetPartitions;
+    private final ITuplePartitioner partitioner;
     protected final int resourcePartition;
     protected ITransactionContext transactionContext;
     protected LogRecord logRecord;
     protected final boolean isSink;

     public CommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, 
int[] primaryKeyFields,
-            boolean isWriteTransaction, int resourcePartition, boolean isSink) 
{
+            boolean isWriteTransaction, int resourcePartition, boolean isSink,
+            ITuplePartitionerFactory partitionerFactory, int[] 
datasetPartitions) {
         this.ctx = ctx;
+        this.datasetPartitions = datasetPartitions;
         INcApplicationContext appCtx =
                 (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
         this.transactionManager = 
appCtx.getTransactionSubsystem().getTransactionManager();
@@ -75,6 +81,7 @@
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
         this.isSink = isSink;
+        this.partitioner = partitionerFactory != null ? 
partitionerFactory.createPartitioner(ctx) : null;
         longHashes = new long[2];
     }

@@ -102,7 +109,7 @@
         for (int t = 0; t < nTuple; t++) {
             tRef.reset(tAccess, t);
             try {
-                formLogRecord(buffer, t);
+                formLogRecord(tAccess, t);
                 logMgr.log(logRecord);
                 if (!isSink) {
                     appendTupleToFrame(t);
@@ -130,10 +137,11 @@
         TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, 
datasetId, resourcePartition, marker);
     }

-    protected void formLogRecord(ByteBuffer buffer, int t) {
+    protected void formLogRecord(FrameTupleAccessor accessor, int t) throws 
HyracksDataException {
         int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+        int resource = getResourcePartition(accessor, t);
         TransactionUtil.formEntityCommitLogRecord(logRecord, 
transactionContext, datasetId, pkHash, tRef,
-                primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
+                primaryKeyFields, resource, LogType.ENTITY_COMMIT);
     }

     protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] 
primaryKeyFields) {
@@ -141,6 +149,10 @@
         return Math.abs((int) longHashes[0]);
     }

+    protected int getResourcePartition(FrameTupleAccessor tupleAccessor, int 
tuple) throws HyracksDataException {
+        return partitioner != null ? 
datasetPartitions[partitioner.partition(tupleAccessor, tuple)] : 
resourcePartition;
+    }
+
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor 
recordDescriptor) {
         this.inputRecordDesc = recordDescriptor;
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 708e8dc..269f686 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -23,13 +23,15 @@
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;

 public class CommitRuntimeFactory extends AbstractPushRuntimeFactory {

-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;

+    protected final ITuplePartitionerFactory partitionerFactory;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final boolean isWriteTransaction;
@@ -37,7 +39,8 @@
     protected final boolean isSink;

     public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean 
isWriteTransaction,
-            int[] datasetPartitions, boolean isSink) {
+            int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory 
partitionerFactory) {
+        this.partitionerFactory = partitionerFactory;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isWriteTransaction = isWriteTransaction;
@@ -55,6 +58,7 @@
         IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
         return new IPushRuntime[] { new CommitRuntime(ctx, 
((IJobEventListenerFactory) fact).getTxnId(datasetId),
                 datasetId, primaryKeyFields, isWriteTransaction,
-                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) };
+                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink, 
partitionerFactory,
+                datasetPartitions) };
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71
Gerrit-Change-Number: 17458
Gerrit-PatchSet: 4
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-MessageType: merged

Reply via email to