Yingyi Bu has submitted this change and it was merged. Change subject: Change DataflowHelperFactory not to require Task Context ......................................................................
Change DataflowHelperFactory not to require Task Context Change-Id: I9dcd95dbefca131c4bbdb43306f00f6f8ea60800 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1758 Reviewed-by: Yingyi Bu <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java 20 files changed, 180 insertions(+), 20 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 23de847..5932aff 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -18,13 +18,24 @@ */ package org.apache.asterix.messaging; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.INcResponse; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.cc.ClusterControllerService; @@ -35,6 +46,10 @@ private static final Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName()); private final ClusterControllerService ccs; + private final Map<Long, MutablePair<MutableInt, MutablePair<ResponseState, Object>>> handles = + new ConcurrentHashMap<>(); + private static final AtomicLong REQUEST_ID_GENERATOR = new AtomicLong(0); + private static final Object UNINITIALIZED = new Object(); public CCMessageBroker(ClusterControllerService ccs) { this.ccs = ccs; @@ -56,4 +71,75 @@ NodeControllerState state = nodeManager.getNodeControllerState(nodeId); state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); } + + public long newRequestId() { + return REQUEST_ID_GENERATOR.incrementAndGet(); + } + + @Override + public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests, + long timeout) throws Exception { + MutableInt numRequired = new MutableInt(0); + MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair = + MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED)); + pair.getKey().setValue(ncs.size()); + handles.put(reqId, pair); + try { + synchronized (pair) { + for (int i = 0; i < ncs.size(); i++) { + String nc = ncs.get(i); + INcAddressedMessage message = requests.get(i); + sendApplicationMessageToNC(message, nc); + } + long time = System.currentTimeMillis(); + while (pair.getLeft().getValue() > 0) { + try { + pair.wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + if (System.currentTimeMillis() - time > timeout && pair.getLeft().getValue() > 0) { + throw new RuntimeDataException(ErrorCode.NC_REQUEST_TIMEOUT, timeout / 1000.0); + } + } + } + MutablePair<ResponseState, Object> right = pair.getRight(); + switch (right.getKey()) { + case FAILURE: + throw HyracksDataException.create((Exception) right.getValue()); + case SUCCESS: + return right.getRight(); + default: + throw new RuntimeDataException(ErrorCode.COMPILATION_ILLEGAL_STATE, String.valueOf(right.getKey())); + } + } finally { + handles.remove(reqId); + } + } + + @Override + public void respond(Long reqId, INcResponse response) { + Pair<MutableInt, MutablePair<ResponseState, Object>> pair = handles.get(reqId); + if (pair != null) { + synchronized (pair) { + try { + MutablePair<ResponseState, Object> result = pair.getValue(); + switch (result.getKey()) { + case SUCCESS: + case UNINITIALIZED: + response.setResult(result); + break; + default: + break; + } + } finally { + // Decrement the response counter + MutableInt remainingResponses = pair.getKey(); + remainingResponses.setValue(remainingResponses.getValue() - 1); + pair.notifyAll(); + } + } + } + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 7d4b41d..8c1ce4e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -447,7 +447,8 @@ public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo, IStorageComponentProvider storageComponentProvider) throws AlgebricksException { - return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider); + return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), + primaryIndexInfo.fileSplitProvider); } public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes, @@ -459,6 +460,6 @@ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators, storageComponentProvider); return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider) - .create(createTestContext(true), PARTITION); + .create(createTestContext(true).getJobletContext().getServiceContext(), PARTITION); } } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 912ac37..785135b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -63,6 +63,8 @@ public static final int TYPE_CONVERT_INTEGER_TARGET = 20; public static final int TYPE_CONVERT_OUT_OF_BOUND = 21; public static final int FIELD_SHOULD_BE_TYPED = 22; + public static final int NC_REQUEST_TIMEOUT = 23; + public static final int INSTANTIATION_ERROR = 100; // Compilation errors diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java index b2fde52..69c0ca0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java @@ -18,9 +18,16 @@ */ package org.apache.asterix.common.messaging.api; +import java.util.List; + import org.apache.hyracks.api.messages.IMessageBroker; public interface ICCMessageBroker extends IMessageBroker { + public enum ResponseState { + UNINITIALIZED, + SUCCESS, + FAILURE + } /** * Sends the passed message to the specified {@code nodeId} @@ -29,5 +36,24 @@ * @param nodeId * @throws Exception */ - public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception; + void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception; + + /** + * Sends the passed requests to all NCs and wait for the response + * + * @param ncs + * @param requests + * @param timeout + * @throws Exception + */ + Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests, + long timeout) throws Exception; + + /** + * respond to a sync request + * + * @param reqId + * @param response + */ + void respond(Long reqId, INcResponse response); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java new file mode 100644 index 0000000..e3c3d2b --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.messaging.api; + +import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState; +import org.apache.commons.lang3.tuple.MutablePair; + +@FunctionalInterface +public interface INcResponse { + /** + * Sets the response in the result mutable place holder + * adjust the response state as needed + * + * @param result + */ + void setResult(MutablePair<ResponseState, Object> result); + +} diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 9efb6b8..c118c36 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -56,6 +56,8 @@ 20 = Can't convert integer types. The target type should be one of %1$s. 21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s 22 = The accessed field is untyped, but should be typed +23 = %1$ss passed before getting back the responses from NCs + 100 = Unable to instantiate class %1$s # Compile-time check errors diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java index 95debe3..8d83b9a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java @@ -63,8 +63,8 @@ try { // perform operation on btrees for (int i = 0; i < treeIndexesDataflowHelperFactories.size(); i++) { - IIndexDataflowHelper indexHelper = - treeIndexesDataflowHelperFactories.get(i).create(ctx, partition); + IIndexDataflowHelper indexHelper = treeIndexesDataflowHelperFactories.get(i) + .create(ctx.getJobletContext().getServiceContext(), partition); performOpOnIndex(indexHelper, ctx); } } catch (Exception e) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java index 09a3c47..79dc396 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java @@ -66,7 +66,8 @@ @Override public void initialize() throws HyracksDataException { IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition); - IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition); + IIndexDataflowHelper indexHelper = + dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator(); // Build the index indexBuilder.build(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java index 94ef285..4bc2867 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java @@ -62,7 +62,8 @@ return new AbstractOperatorNodePushable() { @Override public void initialize() throws HyracksDataException { - final IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition); + final IIndexDataflowHelper indexHelper = + dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator(); // Open and get indexHelper.open(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java index 20744bc..6299982 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java @@ -61,7 +61,8 @@ throws HyracksDataException { // Create a file index accessor to be used for files lookup operations final ExternalFileIndexAccessor snapshotAccessor = new ExternalFileIndexAccessor( - dataflowHelperFactory.create(ctx, partition), searchOpCallbackFactory, version); + dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition), + searchOpCallbackFactory, version); return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { // The adapter that uses the file index along with the coming tuples to access files in HDFS private LookupAdapter<?> adapter; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java index 5348744..19b8a68 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java @@ -23,9 +23,11 @@ import java.util.List; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameTupleAppender; import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -301,11 +303,15 @@ private IHyracksTaskContext[] mockIHyracksTaskContext() throws HyracksDataException { IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class); + IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class); + INCServiceContext serviceCtx = Mockito.mock(INCServiceContext.class); Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer()); Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer()); Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE); Mockito.when(ctx.reallocateFrame(Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean())) .thenReturn(mockByteBuffer()); + Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); + Mockito.when(jobletCtx.getServiceContext()).thenReturn(serviceCtx); return new IHyracksTaskContext[] { ctx }; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java index 77d45f8..12065d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java @@ -21,11 +21,11 @@ import java.io.Serializable; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; @FunctionalInterface public interface IIndexDataflowHelperFactory extends Serializable { - IIndexDataflowHelper create(final IHyracksTaskContext ctx, int partition) throws HyracksDataException; + IIndexDataflowHelper create(final INCServiceContext ctx, int partition) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java index 82fedb0..5fc07ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java @@ -48,7 +48,7 @@ IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc) throws HyracksDataException { this.ctx = ctx; - this.indexHelper = indexDataflowHelperFactory.create(ctx, partition); + this.indexHelper = indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.fillFactor = fillFactor; this.verifyInput = verifyInput; this.numElementsHint = numElementsHint; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java index dd47154..4c811bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java @@ -18,7 +18,7 @@ */ package org.apache.hyracks.storage.am.common.dataflow; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.FileSplit; @@ -38,9 +38,9 @@ } @Override - public IIndexDataflowHelper create(IHyracksTaskContext ctx, int partition) throws HyracksDataException { + public IIndexDataflowHelper create(INCServiceContext ctx, int partition) throws HyracksDataException { FileSplit fileSplit = fileSplitProvider.getFileSplits()[partition]; FileReference resourceRef = fileSplit.getFileReference(ctx.getIoManager()); - return new IndexDataflowHelper(ctx.getJobletContext().getServiceContext(), storageMgr, resourceRef); + return new IndexDataflowHelper(ctx, storageMgr, resourceRef); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java index f6073a4..fce31ca 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java @@ -31,7 +31,7 @@ public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx, int partition) throws HyracksDataException { - this.indexHelper = indexHelperFactory.create(ctx, partition); + this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java index d41acdf..e80a837 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java @@ -63,7 +63,7 @@ IndexOperation op, IModificationOperationCallbackFactory modOpCallbackFactory, ITupleFilterFactory tupleFilterFactory) throws HyracksDataException { this.ctx = ctx; - this.indexHelper = indexHelperFactory.create(ctx, partition); + this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.modOpCallbackFactory = modOpCallbackFactory; this.tupleFilterFactory = tupleFilterFactory; this.inputRecDesc = inputRecDesc; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 0352cea..b358f07 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -86,7 +86,7 @@ ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter) throws HyracksDataException { this.ctx = ctx; - this.indexHelper = indexHelperFactory.create(ctx, partition); + this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.retainInput = retainInput; this.retainMissing = retainMissing; this.appendIndexFilter = appendIndexFilter; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java index 6075c3d..bc7cb85 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java @@ -48,7 +48,7 @@ IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory) throws HyracksDataException { this.ctx = ctx; - this.treeIndexHelper = indexHelperFactory.create(ctx, partition); + this.treeIndexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.searchCallbackFactory = searchCallbackFactory; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java index 0210145..c00cecb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java @@ -49,7 +49,7 @@ IIndexDataflowHelperFactory indexHelperFactory, IStorageManager storageManager) throws HyracksDataException { this.ctx = ctx; - this.treeIndexHelper = indexHelperFactory.create(ctx, partition); + this.treeIndexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.storageManager = storageManager; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java index 5e4bc7d..5ff3308 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java @@ -35,7 +35,7 @@ public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory) throws HyracksDataException { - this.indexHelper = indexHelperFactory.create(ctx, partition); + this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/1758 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9dcd95dbefca131c4bbdb43306f00f6f8ea60800 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
