abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1618
Change subject: Ensure nextFrame and flush are not called in a failed pipeline
......................................................................
Ensure nextFrame and flush are not called in a failed pipeline
Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
22 files changed, 115 insertions(+), 60 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/18/1618/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 2eca55d..296a3ed 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -72,7 +72,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
RecordDescriptor inputRecDesc =
recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
@@ -102,7 +102,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
currentTupleIdx = 0;
lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 9982477..7b11cde 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -91,7 +91,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
synchronized (writer) {
writer.open();
}
@@ -135,7 +135,7 @@
}
@Override
- public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer frame) throws HyracksDataException {
try {
total++;
if (consumer.cause() != null) {
@@ -390,7 +390,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void doFlush() throws HyracksDataException {
synchronized (writer) {
writer.flush();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
index 1bdc7e1..a8ca426 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
@@ -35,12 +35,12 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
writer.open();
}
@Override
- public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer frame) throws HyracksDataException {
while (frame != null) {
try {
writer.nextFrame(frame);
@@ -65,7 +65,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
writer.flush();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
index 69a2020..6e2b190 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
@@ -45,7 +45,7 @@
// We override the open function to search a specific version of the index
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
writer.open();
ExternalBTreeWithBuddyDataflowHelper dataFlowHelper =
(ExternalBTreeWithBuddyDataflowHelper) indexHelper;
accessor = new FrameTupleAccessor(inputRecDesc);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index c95a4a7..5e4db84 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -54,7 +54,7 @@
// when creating the bulkLoader, it creates a transaction bulk loader
// It uses the bulkLoader to insert delete tuples for the deleted files
@Override
- public void open() throws HyracksDataException {
+ public void doOpen() throws HyracksDataException {
RecordDescriptor recDesc =
recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(recDesc);
indexHelper.open();
@@ -76,7 +76,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index d4718a9..8b888e1 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -78,7 +78,7 @@
private boolean indexOpen = false;
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
try {
adapter = adapterFactory.createAdapter(ctx, partition,
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
snapshotAccessor, writer);
@@ -122,7 +122,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws
HyracksDataException {
try {
adapter.nextFrame(buffer);
} catch (Throwable th) {
@@ -131,7 +131,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
adapter.flush();
}
};
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
index 237e2c0..e3415cc 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
@@ -44,7 +44,7 @@
// We override this method to specify the searched version of the index
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
writer.open();
accessor = new FrameTupleAccessor(inputRecDesc);
indexHelper.open();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index bdc11f5..84176cb 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -73,12 +73,12 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void doOpen() throws HyracksDataException {
writer.open();
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
writer.nextFrame(buffer);
}
@@ -88,7 +88,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void doFlush() throws HyracksDataException {
writer.flush();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index fbdbece..5fa03c3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -113,7 +113,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
ActiveRuntimeId runtimeId = new
ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString(), partition);
try {
initializeNewFeedRuntime(runtimeId);
@@ -138,7 +138,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
try {
FeedUtils.processFeedMessage(buffer, message, fta);
writer.nextFrame(buffer);
@@ -161,7 +161,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
writer.flush();
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 0bb27db..d3e5676 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -107,9 +107,9 @@
}
@Override
- public void open() throws HyracksDataException {
- ActiveRuntimeId runtimeId = new
ActiveRuntimeId(connectionId.getFeedId(),
- runtimeType.toString() + "." + targetId, partition);
+ protected void doOpen() throws HyracksDataException {
+ ActiveRuntimeId runtimeId =
+ new ActiveRuntimeId(connectionId.getFeedId(),
runtimeType.toString() + "." + targetId, partition);
try {
initializeNewFeedRuntime(runtimeId);
insertOperator.open();
@@ -123,8 +123,7 @@
fta = new
FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(),
0));
insertOperator.setOutputFrameWriter(0, writer, recordDesc);
if (insertOperator instanceof LSMInsertDeleteOperatorNodePushable) {
- LSMInsertDeleteOperatorNodePushable indexOp =
- (LSMInsertDeleteOperatorNodePushable) insertOperator;
+ LSMInsertDeleteOperatorNodePushable indexOp =
(LSMInsertDeleteOperatorNodePushable) insertOperator;
if (!indexOp.isPrimary()) {
writer = insertOperator;
return;
@@ -139,7 +138,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
try {
FeedUtils.processFeedMessage(buffer, message, fta);
writer.nextFrame(buffer);
@@ -160,7 +159,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
writer.flush();
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 1f18c97..b51db6f 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -125,7 +125,7 @@
// have been obtained through searchForUpsert operation
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
RecordDescriptor inputRecDesc =
recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
@@ -202,7 +202,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor)
indexAccessor;
int tupleCount = accessor.getTupleCount();
@@ -333,7 +333,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void doFlush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 0db5ff0..8b46026 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -65,8 +65,8 @@
}
@Override
- public void open() throws HyracksDataException {
- super.open();
+ protected void doOpen() throws HyracksDataException {
+ super.doOpen();
abstractModCallback = (AbstractIndexModificationOperationCallback)
modCallback;
}
@@ -96,7 +96,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
index ec31eb7..5d27b07 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
@@ -18,10 +18,19 @@
*/
package org.apache.hyracks.dataflow.std.base;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class AbstractUnaryInputUnaryOutputOperatorNodePushable
extends AbstractUnaryOutputOperatorNodePushable
implements IFrameWriter {
+ private static final Logger LOGGER =
+
Logger.getLogger(AbstractUnaryInputUnaryOutputOperatorNodePushable.class.getName());
+ private boolean failed = false;
+
@Override
public final IFrameWriter getInputFrameWriter(int index) {
return this;
@@ -31,4 +40,46 @@
public final int getInputArity() {
return 1;
}
+
+ @Override
+ public final void open() throws HyracksDataException {
+ failed = true;
+ doOpen();
+ failed = false;
+ }
+
+ protected abstract void doOpen() throws HyracksDataException;
+
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException
{
+ if (failed) {
+ LOGGER.log(Level.WARNING, "next Frame called on a failed writer");
+ return;
+ }
+ failed = true;
+ doNextFrame(buffer);
+ failed = false;
+ }
+
+ protected abstract void doNextFrame(ByteBuffer buffer) throws
HyracksDataException;
+
+ @Override
+ public final void flush() throws HyracksDataException {
+ if (failed) {
+ LOGGER.log(Level.WARNING, "flush called on a failed writer");
+ return;
+ }
+ failed = true;
+ doFlush();
+ failed = false;
+ }
+
+ public final void unfail() {
+ if (!failed) {
+ LOGGER.log(Level.WARNING, "unfail called on a not failed writer");
+ }
+ failed = false;
+ }
+
+ protected abstract void doFlush() throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 2acc4db..06e51e3 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -52,7 +52,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
final IBinaryComparator[] comparators = new
IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -63,7 +63,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
pgw.nextFrame(buffer);
}
@@ -78,7 +78,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
pgw.flush();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
index a07540b..3ba2a20 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
@@ -43,12 +43,12 @@
throws HyracksDataException {
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
writer.open();
}
@Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws
HyracksDataException {
writer.nextFrame(buffer);
}
@@ -63,7 +63,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
writer.flush();
}
};
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
index 084c9ab..396655d 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -54,7 +54,7 @@
private boolean finished;
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
fta = new FrameTupleAccessor(recordDescriptors[0]);
currentSize = 0;
finished = false;
@@ -62,7 +62,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws
HyracksDataException {
if (!finished) {
fta.reset(buffer);
int count = fta.getTupleCount();
@@ -94,7 +94,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
writer.flush();
}
};
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index fd4b094..8e3b5bc 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -95,14 +95,14 @@
private boolean failed = false;
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
state = new
MaterializerTaskState(ctx.getJobletContext().getJobId(),
new TaskId(getActivityId(), partition));
state.open(ctx);
}
@Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws
HyracksDataException {
state.appendFrame(buffer);
}
@@ -116,6 +116,11 @@
state.close();
state.writeOut(writer, new VSizeFrame(ctx), failed);
}
+
+ @Override
+ protected void doFlush() throws HyracksDataException {
+ // No op
+ }
};
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 6439279..e7d3d55 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -63,7 +63,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
RecordDescriptor recDesc =
recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(recDesc);
indexHelper.open();
@@ -77,7 +77,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
@@ -116,7 +116,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void doFlush() throws HyracksDataException {
writer.flush();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4f01978..d0470f7 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -70,7 +70,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
RecordDescriptor inputRecDesc =
recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
@@ -93,7 +93,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
@@ -167,7 +167,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
writer.flush();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index c089854..e8e8f81 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -107,7 +107,7 @@
protected abstract int getFieldCount();
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
writer.open();
indexHelper.open();
index = indexHelper.getIndexInstance();
@@ -174,7 +174,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
try {
@@ -190,7 +190,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ protected void doFlush() throws HyracksDataException {
appender.flush(writer);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index b51d132..a862357 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -37,8 +37,8 @@
protected FrameTupleAppender appender;
@Override
- public void open() throws HyracksDataException {
- super.open();
+ protected void doOpen() throws HyracksDataException {
+ super.doOpen();
appender = new FrameTupleAppender(writeBuffer);
}
@@ -49,7 +49,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
int nextFlushTupleIndex = 0;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 002457b..0f4e9bf 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -65,7 +65,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ protected void doOpen() throws HyracksDataException {
writer.open();
accessor = new FrameTupleAccessor(inputRecDesc);
builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
@@ -74,7 +74,7 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
@@ -166,7 +166,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void doFlush() throws HyracksDataException {
appender.flush(writer);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1618
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>