>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458 )
Change subject: WIP: commit runtime changes
......................................................................
WIP: commit runtime changes
Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71
---
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
5 files changed, 42 insertions(+), 12 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/58/17458/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a65c898..bc0527a 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -854,8 +854,10 @@
for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
pkFieldsInCommitOp[i] = start++;
}
+ ITuplePartitionerFactory partitionerFactory =
+ new FieldHashPartitionerFactory(pkFieldsInCommitOp,
pkHashFunFactories, numPartitions);
CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx),
dataset.getDatasetId(), pkFieldsInCommitOp,
- true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
+ true, ctx.getTaskAttemptId().getTaskId().getPartition(), true,
partitionerFactory);
insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
return Pair.of(insertOp, commitOp);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index f87757c..2ee6384 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -36,8 +36,10 @@
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
@@ -58,11 +60,14 @@
public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider
metadataProvider, int[] keyFieldPermutation,
boolean isSink) throws AlgebricksException {
return new IPushRuntimeFactory() {
+ final ITuplePartitionerFactory partitionerFactory = new
FieldHashPartitionerFactory(keyFieldPermutation,
+ getPrimaryHashFunctionFactories(metadataProvider),
getDatasetPartitions(metadataProvider).length);
+
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx)
throws HyracksDataException {
- return new IPushRuntime[] {
- new CommitRuntime(ctx, new
TxnId(ctx.getJobletContext().getJobId().getId()), getDatasetId(),
- keyFieldPermutation, true,
ctx.getTaskAttemptId().getTaskId().getPartition(), true) };
+ return new IPushRuntime[] { new CommitRuntime(ctx, new
TxnId(ctx.getJobletContext().getJobId().getId()),
+ getDatasetId(), keyFieldPermutation, true,
ctx.getTaskAttemptId().getTaskId().getPartition(),
+ true, partitionerFactory) };
}
};
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index c0f2ddd..4b225ec 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -99,11 +99,13 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -631,8 +633,11 @@
public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider
metadataProvider,
int[] primaryKeyFieldPermutation, boolean isSink) throws
AlgebricksException {
int[] datasetPartitions = getDatasetPartitions(metadataProvider);
+ IBinaryHashFunctionFactory[] pkHashFunFactories =
getPrimaryHashFunctionFactories(metadataProvider);
+ ITuplePartitionerFactory partitionerFactory = new
FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+ pkHashFunFactories, datasetPartitions.length);
return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
metadataProvider.isWriteTransaction(),
- datasetPartitions, isSink);
+ datasetPartitions, isSink, partitionerFactory);
}
public IFrameOperationCallbackFactory
getFrameOpCallbackFactory(MetadataProvider mdProvider) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 2692cc7..a338b15 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -34,6 +34,8 @@
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -56,13 +58,15 @@
protected final boolean isWriteTransaction;
protected final long[] longHashes;
protected final IHyracksTaskContext ctx;
+ protected final ITuplePartitioner partitioner;
protected final int resourcePartition;
protected ITransactionContext transactionContext;
protected LogRecord logRecord;
protected final boolean isSink;
public CommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId,
int[] primaryKeyFields,
- boolean isWriteTransaction, int resourcePartition, boolean isSink)
{
+ boolean isWriteTransaction, int resourcePartition, boolean isSink,
+ ITuplePartitionerFactory partitionerFactory) {
this.ctx = ctx;
INcApplicationContext appCtx =
(INcApplicationContext)
ctx.getJobletContext().getServiceContext().getApplicationContext();
@@ -75,6 +79,7 @@
this.isWriteTransaction = isWriteTransaction;
this.resourcePartition = resourcePartition;
this.isSink = isSink;
+ this.partitioner = partitionerFactory.createPartitioner(ctx);
longHashes = new long[2];
}
@@ -102,7 +107,7 @@
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
try {
- formLogRecord(buffer, t);
+ formLogRecord(tAccess, t);
logMgr.log(logRecord);
if (!isSink) {
appendTupleToFrame(t);
@@ -130,10 +135,11 @@
TransactionUtil.formMarkerLogRecord(logRecord, transactionContext,
datasetId, resourcePartition, marker);
}
- protected void formLogRecord(ByteBuffer buffer, int t) {
+ protected void formLogRecord(FrameTupleAccessor accessor, int t) throws
HyracksDataException {
int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+ int partition = partitioner.partition(accessor, t);
TransactionUtil.formEntityCommitLogRecord(logRecord,
transactionContext, datasetId, pkHash, tRef,
- primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
+ primaryKeyFields, partition, LogType.ENTITY_COMMIT);
}
protected int computePrimaryKeyHashValue(ITupleReference tuple, int[]
primaryKeyFields) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 708e8dc..e84287d 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -23,13 +23,15 @@
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
public class CommitRuntimeFactory extends AbstractPushRuntimeFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ protected final ITuplePartitionerFactory partitionerFactory;
protected final int datasetId;
protected final int[] primaryKeyFields;
protected final boolean isWriteTransaction;
@@ -37,7 +39,8 @@
protected final boolean isSink;
public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean
isWriteTransaction,
- int[] datasetPartitions, boolean isSink) {
+ int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory
partitionerFactory) {
+ this.partitionerFactory = partitionerFactory;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.isWriteTransaction = isWriteTransaction;
@@ -55,6 +58,6 @@
IJobletEventListenerFactory fact =
ctx.getJobletContext().getJobletEventListenerFactory();
return new IPushRuntime[] { new CommitRuntime(ctx,
((IJobEventListenerFactory) fact).getTxnId(datasetId),
datasetId, primaryKeyFields, isWriteTransaction,
-
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) };
+
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink,
partitionerFactory) };
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71
Gerrit-Change-Number: 17458
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange