abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2463
Change subject: [NO ISSUE][ING] Follow the IFrameWriter contract in Feed Pipeline ...................................................................... [NO ISSUE][ING] Follow the IFrameWriter contract in Feed Pipeline Change-Id: Ife679fb9643dc6b39d035e0eecdb915b227503a5 --- D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java 22 files changed, 121 insertions(+), 672 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/63/2463/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java deleted file mode 100644 index 22d0d6b..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.api; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; - -public interface ITupleForwarder { - - public static final String FORWARD_POLICY = "forward-policy"; - - public enum TupleForwardPolicy { - FRAME_FULL, - COUNTER_TIMER_EXPIRED, - RATE_CONTROLLED, - FEED - } - - public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException; - - public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException; - - public void close() throws HyracksDataException; - -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java index bbd93c2..a324496 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java @@ -19,16 +19,13 @@ package org.apache.asterix.external.dataflow; import org.apache.asterix.external.api.IDataFlowController; -import org.apache.asterix.external.api.ITupleForwarder; import org.apache.hyracks.api.context.IHyracksTaskContext; public abstract class AbstractDataFlowController implements IDataFlowController { - protected final ITupleForwarder tupleForwarder; protected final IHyracksTaskContext ctx; - public AbstractDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder) { + public AbstractDataFlowController(IHyracksTaskContext ctx) { this.ctx = ctx; - this.tupleForwarder = tupleForwarder; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index 3437de1..54ecaee 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -25,32 +25,18 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; public abstract class AbstractFeedDataFlowController implements IDataFlowController { - protected final FeedTupleForwarder tupleForwarder; + protected TupleForwarder tupleForwarder; protected final IHyracksTaskContext ctx; protected final int numOfFields; protected final ArrayTupleBuilder tb; protected final FeedLogManager feedLogManager; protected boolean flushing; - public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, - FeedLogManager feedLogManager, int numOfFields) { + public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfFields) { this.feedLogManager = feedLogManager; this.numOfFields = numOfFields; this.ctx = ctx; - this.tupleForwarder = tupleForwarder; this.tb = new ArrayTupleBuilder(numOfFields); - } - - @Override - public boolean pause() { - tupleForwarder.pause(); - return true; - } - - @Override - public boolean resume() { - tupleForwarder.resume(); - return true; } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java index 2db17e2..b14722b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java @@ -30,11 +30,10 @@ private final IRecordWithPKDataParser<T> dataParser; - public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder, - final FeedLogManager feedLogManager, final int numOfOutputFields, - final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) - throws HyracksDataException { - super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader); + public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager, + final int numOfOutputFields, final IRecordWithPKDataParser<T> dataParser, + final IRecordReader<T> recordReader) throws HyracksDataException { + super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader); this.dataParser = dataParser; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java index 4447b28..621397b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java @@ -28,11 +28,10 @@ public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> { - public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder, - final FeedLogManager feedLogManager, final int numOfOutputFields, - final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader) - throws HyracksDataException { - super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader); + public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager, + final int numOfOutputFields, final IRecordWithMetadataParser<T> dataParser, + final IRecordReader<T> recordReader) throws HyracksDataException { + super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java deleted file mode 100644 index 10815d9..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.dataflow; - -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.ITupleForwarder; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class CounterTimerTupleForwarder implements ITupleForwarder { - - public static final String BATCH_SIZE = "batch-size"; - public static final String BATCH_INTERVAL = "batch-interval"; - - private static final Logger LOGGER = LogManager.getLogger(); - - private FrameTupleAppender appender; - private IFrame frame; - private IFrameWriter writer; - private int batchSize; - private long batchInterval; - private int tuplesInFrame = 0; - private TimeBasedFlushTask flushTask; - private Timer timer; - private Object lock = new Object(); - private boolean activeTimer = false; - - private CounterTimerTupleForwarder(int batchSize, long batchInterval) { - this.batchSize = batchSize; - this.batchInterval = batchInterval; - if (batchInterval > 0L) { - activeTimer = true; - } - } - - // Factory method - public static CounterTimerTupleForwarder create(Map<String, String> configuration) { - int batchSize = -1; - long batchInterval = 0L; - String propValue = configuration.get(BATCH_SIZE); - if (propValue != null) { - batchSize = Integer.parseInt(propValue); - } - propValue = configuration.get(BATCH_INTERVAL); - if (propValue != null) { - batchInterval = Long.parseLong(propValue); - } - return new CounterTimerTupleForwarder(batchSize, batchInterval); - } - - @Override - public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException { - this.appender = new FrameTupleAppender(); - this.frame = new VSizeFrame(ctx); - appender.reset(frame, true); - this.writer = writer; - if (activeTimer) { - this.timer = new Timer(); - this.flushTask = new TimeBasedFlushTask(writer, lock); - timer.scheduleAtFixedRate(flushTask, 0, batchInterval); - } - } - - @Override - public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException { - if (activeTimer) { - synchronized (lock) { - addTupleToFrame(tb); - } - } else { - addTupleToFrame(tb); - } - tuplesInFrame++; - } - - private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException { - if (tuplesInFrame == batchSize - || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples"); - } - FrameUtils.flushFrame(frame.getBuffer(), writer); - tuplesInFrame = 0; - appender.reset(frame, true); - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE); - } - } - } - - @Override - public void close() throws HyracksDataException { - if (appender.getTupleCount() > 0) { - if (activeTimer) { - synchronized (lock) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - } - } else { - FrameUtils.flushFrame(frame.getBuffer(), writer); - } - } - - if (timer != null) { - timer.cancel(); - } - } - - private class TimeBasedFlushTask extends TimerTask { - - private IFrameWriter writer; - private final Object lock; - - public TimeBasedFlushTask(IFrameWriter writer, Object lock) { - this.writer = writer; - this.lock = lock; - } - - @Override - public void run() { - try { - if (tuplesInFrame > 0) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")"); - } - synchronized (lock) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - appender.reset(frame, true); - tuplesInFrame = 0; - } - } - } catch (HyracksDataException e) { - e.printStackTrace(); - } - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 9826be7..22e0c1a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -26,14 +26,12 @@ import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; -import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; @@ -58,10 +56,9 @@ protected long incomingRecordsCount = 0; protected long failedRecordsCount = 0; - public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, - FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser, - IRecordReader<T> recordReader) throws HyracksDataException { - super(ctx, tupleForwarder, feedLogManager, numOfOutputFields); + public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields, + IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException { + super(ctx, feedLogManager, numOfOutputFields); this.dataParser = dataParser; this.recordReader = recordReader; recordReader.setFeedLogManager(feedLogManager); @@ -79,7 +76,7 @@ } Throwable failure = null; try { - tupleForwarder.initialize(ctx, writer); + this.tupleForwarder = new TupleForwarder(ctx, writer); while (hasNext()) { IRawRecord<? extends T> record = next(); if (record == null) { @@ -102,17 +99,14 @@ try { flush(); } catch (Exception flushException) { - tupleForwarder.fail(); flushException.addSuppressed(e); failure = flushException; } } else { failure = e; - tupleForwarder.fail(); } } catch (Throwable e) { failure = e; - tupleForwarder.fail(); LOGGER.log(Level.WARN, "Failure while operating a feed source", e); } finally { failure = finish(failure); @@ -168,11 +162,17 @@ } private Throwable finish(Throwable failure) { - Throwable th = CleanupUtils.close(recordReader, null); - th = DataflowUtils.close(tupleForwarder, th); + Throwable th = CleanupUtils.close(recordReader, failure); + if (th != null) { + try { + tupleForwarder.complete(); + } catch (Throwable completeFailure) { + th = completeFailure; + } + } closeSignal(); setState(State.STOPPED); - return ExceptionUtils.suppress(failure, th); + return th; } private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java index 025520e..ffa42e5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java @@ -31,9 +31,9 @@ private final AsterixInputStream stream; protected long incomingRecordsCount = 0; - public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, - FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) { - super(ctx, tupleForwarder, feedLogManager, 1); + public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, + IStreamDataParser streamParser, AsterixInputStream inputStream) { + super(ctx, feedLogManager, 1); this.dataParser = streamParser; this.stream = inputStream; } @@ -41,7 +41,7 @@ @Override public void start(IFrameWriter writer) throws HyracksDataException { try { - tupleForwarder.initialize(ctx, writer); + tupleForwarder = new TupleForwarder(ctx, writer); while (true) { if (!parseNext()) { break; @@ -50,10 +50,9 @@ tupleForwarder.addTuple(tb); incomingRecordsCount++; } - } catch (Exception e) { + tupleForwarder.complete(); + } catch (Throwable e) { throw HyracksDataException.create(e); - } finally { - tupleForwarder.close(); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java deleted file mode 100644 index f824b67..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.dataflow; - -import java.io.IOException; - -import org.apache.asterix.external.api.ITupleForwarder; -import org.apache.asterix.external.util.DataflowUtils; -import org.apache.asterix.external.util.FeedLogManager; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; - -public class FeedTupleForwarder implements ITupleForwarder { - - private final FeedLogManager feedLogManager; - private FrameTupleAppender appender; - private IFrame frame; - private IFrameWriter writer; - private boolean paused = false; - private boolean initialized; - private boolean failed; - - public FeedTupleForwarder(FeedLogManager feedLogManager) { - this.feedLogManager = feedLogManager; - } - - public FeedLogManager getFeedLogManager() { - return feedLogManager; - } - - @Override - public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException { - if (!initialized) { - this.frame = new VSizeFrame(ctx); - this.writer = writer; - this.appender = new FrameTupleAppender(frame); - initialized = true; - } - } - - @Override - public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException { - if (paused) { - synchronized (this) { - while (paused) { - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); - } - } - } - } - DataflowUtils.addTupleToFrame(appender, tb, writer); - } - - public void pause() { - paused = true; - } - - public synchronized void resume() { - paused = false; - notifyAll(); - } - - @Override - public void close() throws HyracksDataException { - Throwable throwable = null; - try { - if (!failed && appender.getTupleCount() > 0) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - } - } catch (Throwable th) { - throwable = th; - throw th; - } finally { - try { - feedLogManager.close(); - } catch (IOException e) { - if (throwable != null) { - throwable.addSuppressed(e); - } else { - throw HyracksDataException.create(e); - } - } catch (Throwable th) { - if (throwable != null) { - throwable.addSuppressed(th); - } else { - throw th; - } - } - } - } - - public void flush() throws HyracksDataException { - appender.flush(writer); - } - - public void fail() throws HyracksDataException { - failed = true; - writer.fail(); - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java index c7f6d9c..289c16f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java @@ -30,10 +30,9 @@ protected final IRecordWithMetadataParser<T> dataParser; - public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, - FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser, - IRecordReader<T> recordReader) throws HyracksDataException { - super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader); + public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields, + IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException { + super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader); this.dataParser = dataParser; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java deleted file mode 100644 index 18927cd..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.dataflow; - -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.ITupleForwarder; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; - -public class FrameFullTupleForwarder implements ITupleForwarder { - - private FrameTupleAppender appender; - private IFrame frame; - private IFrameWriter writer; - - @Override - public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException { - this.appender = new FrameTupleAppender(); - this.frame = new VSizeFrame(ctx); - this.writer = writer; - appender.reset(frame, true); - } - - @Override - public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException { - boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); - if (!success) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - appender.reset(frame, true); - success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); - if (!success) { - throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE); - } - } - } - - @Override - public void close() throws HyracksDataException { - if (appender.getTupleCount() > 0) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - } - - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java index c4f75e3..b956295 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java @@ -23,7 +23,6 @@ import org.apache.asterix.external.api.IExternalIndexer; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.api.ITupleForwarder; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -31,10 +30,9 @@ public class IndexingDataFlowController<T> extends RecordDataFlowController<T> { private final IExternalIndexer indexer; - public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder, - IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, IExternalIndexer indexer) - throws IOException { - super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields()); + public IndexingDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser, + IRecordReader<? extends T> recordReader, IExternalIndexer indexer) throws IOException { + super(ctx, dataParser, recordReader, 1 + indexer.getNumberOfFields()); this.indexer = indexer; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java deleted file mode 100644 index f34b77d..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.dataflow; - -import java.util.Map; - -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.ITupleForwarder; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; - -public class RateControlledTupleForwarder implements ITupleForwarder { - - private FrameTupleAppender appender; - private IFrame frame; - private IFrameWriter writer; - private long interTupleInterval; - private boolean delayConfigured; - - public static final String INTER_TUPLE_INTERVAL = "tuple-interval"; - - private RateControlledTupleForwarder(long interTupleInterval) { - this.interTupleInterval = interTupleInterval; - delayConfigured = interTupleInterval != 0L; - } - - // Factory method - public static RateControlledTupleForwarder create(Map<String, String> configuration) { - long interTupleInterval = 0L; - String propValue = configuration.get(INTER_TUPLE_INTERVAL); - if (propValue != null) { - interTupleInterval = Long.parseLong(propValue); - } - return new RateControlledTupleForwarder(interTupleInterval); - } - - @Override - public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException { - this.appender = new FrameTupleAppender(); - this.frame = new VSizeFrame(ctx); - this.writer = writer; - appender.reset(frame, true); - } - - @Override - public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException { - if (delayConfigured) { - try { - Thread.sleep(interTupleInterval); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); - } - } - boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); - if (!success) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - appender.reset(frame, true); - success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); - if (!success) { - throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE); - } - } - } - - @Override - public void close() throws HyracksDataException { - if (appender.getTupleCount() > 0) { - FrameUtils.flushFrame(frame.getBuffer(), writer); - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java index 0f9572d..aebdefb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java @@ -21,7 +21,6 @@ import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.api.ITupleForwarder; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -33,9 +32,9 @@ protected final IRecordReader<? extends T> recordReader; protected final int numOfTupleFields; - public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder, - IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, int numOfTupleFields) { - super(ctx, tupleForwarder); + public RecordDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser, + IRecordReader<? extends T> recordReader, int numOfTupleFields) { + super(ctx); this.dataParser = dataParser; this.recordReader = recordReader; this.numOfTupleFields = numOfTupleFields; @@ -45,7 +44,7 @@ public void start(IFrameWriter writer) throws HyracksDataException { try { ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields); - tupleForwarder.initialize(ctx, writer); + TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer); while (recordReader.hasNext()) { IRawRecord<? extends T> record = recordReader.next(); tb.reset(); @@ -54,7 +53,7 @@ appendOtherTupleFields(tb); tupleForwarder.addTuple(tb); } - tupleForwarder.close(); + tupleForwarder.complete(); recordReader.close(); } catch (Exception e) { throw new HyracksDataException(e); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java index ccf22da..a28c484 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java @@ -19,7 +19,6 @@ package org.apache.asterix.external.dataflow; import org.apache.asterix.external.api.IStreamDataParser; -import org.apache.asterix.external.api.ITupleForwarder; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -28,9 +27,8 @@ public class StreamDataFlowController extends AbstractDataFlowController { private final IStreamDataParser dataParser; - public StreamDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder, - IStreamDataParser dataParser) { - super(ctx, tupleForwarder); + public StreamDataFlowController(IHyracksTaskContext ctx, IStreamDataParser dataParser) { + super(ctx); this.dataParser = dataParser; } @@ -38,7 +36,7 @@ public void start(IFrameWriter writer) throws HyracksDataException { try { ArrayTupleBuilder tb = new ArrayTupleBuilder(1); - tupleForwarder.initialize(ctx, writer); + TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer); while (true) { tb.reset(); if (!dataParser.parse(tb.getDataOutput())) { @@ -47,7 +45,7 @@ tb.addFieldEndOffset(); tupleForwarder.addTuple(tb); } - tupleForwarder.close(); + tupleForwarder.complete(); } catch (Exception e) { throw new HyracksDataException(e); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java new file mode 100644 index 0000000..afb00b0 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.dataflow; + +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; + +public class TupleForwarder { + + private final FrameTupleAppender appender; + private final IFrame frame; + private final IFrameWriter writer; + + public TupleForwarder(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException { + this.frame = new VSizeFrame(ctx); + this.writer = writer; + this.appender = new FrameTupleAppender(frame); + } + + public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException { + DataflowUtils.addTupleToFrame(appender, tb, writer); + } + + public void flush() throws HyracksDataException { + appender.flush(writer); + } + + public void complete() throws HyracksDataException { + appender.write(writer, false); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 0503677..a6c2ddd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; import org.apache.hyracks.dataflow.common.utils.TaskUtil; @@ -79,7 +80,8 @@ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); message.getBuffer().flip(); run(); - } catch (Exception e) { + } catch (Throwable e) { + CleanupUtils.fail(writer, e); LOGGER.log(Level.WARN, "Failure during data ingestion", e); throw e; } finally { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index 78f24a5..3e53b52 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -42,12 +42,10 @@ import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController; import org.apache.asterix.external.dataflow.FeedRecordDataFlowController; import org.apache.asterix.external.dataflow.FeedStreamDataFlowController; -import org.apache.asterix.external.dataflow.FeedTupleForwarder; import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController; import org.apache.asterix.external.dataflow.IndexingDataFlowController; import org.apache.asterix.external.dataflow.RecordDataFlowController; import org.apache.asterix.external.dataflow.StreamDataFlowController; -import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedLogManager; import org.apache.asterix.om.types.ARecordType; @@ -70,35 +68,29 @@ IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory; IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx); if (indexingOp) { - return new IndexingDataFlowController(ctx, - DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser, - recordReader, ((IIndexingDatasource) recordReader).getIndexer()); + return new IndexingDataFlowController(ctx, dataParser, recordReader, + ((IIndexingDatasource) recordReader).getIndexer()); } else if (isFeed) { - FeedTupleForwarder tupleForwarder = - (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager); boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration); boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration); if (isRecordWithMeta) { if (isChangeFeed) { int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration); - return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, - numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader); + return new ChangeFeedWithMetaDataFlowController(ctx, feedLogManager, numOfKeys + 2, + (IRecordWithMetadataParser) dataParser, recordReader); } else { - return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2, + return new FeedWithMetaDataFlowController(ctx, feedLogManager, 2, (IRecordWithMetadataParser) dataParser, recordReader); } } else if (isChangeFeed) { int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration); - return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1, + return new ChangeFeedDataFlowController(ctx, feedLogManager, numOfKeys + 1, (IRecordWithPKDataParser) dataParser, recordReader); } else { - return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser, - recordReader); + return new FeedRecordDataFlowController(ctx, feedLogManager, 1, dataParser, recordReader); } } else { - return new RecordDataFlowController(ctx, - DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser, - recordReader, 1); + return new RecordDataFlowController(ctx, dataParser, recordReader, 1); } case STREAM: IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory; @@ -107,12 +99,9 @@ IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition); streamParser.setInputStream(stream); if (isFeed) { - return new FeedStreamDataFlowController(ctx, - (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager), - feedLogManager, streamParser, stream); + return new FeedStreamDataFlowController(ctx, feedLogManager, streamParser, stream); } else { - return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null), - streamParser); + return new StreamDataFlowController(ctx, streamParser); } default: throw new RuntimeDataException(ErrorCode.PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java index 438f1df..37d400f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java @@ -18,28 +18,15 @@ */ package org.apache.asterix.external.util; -import java.util.Map; - import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.api.ITupleForwarder; -import org.apache.asterix.external.api.ITupleForwarder.TupleForwardPolicy; -import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder; -import org.apache.asterix.external.dataflow.FeedTupleForwarder; -import org.apache.asterix.external.dataflow.FrameFullTupleForwarder; -import org.apache.asterix.external.dataflow.RateControlledTupleForwarder; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class DataflowUtils { - private static final Logger LOGGER = LogManager.getLogger(); private DataflowUtils() { } @@ -54,32 +41,6 @@ } } - public static ITupleForwarder getTupleForwarder(Map<String, String> configuration, FeedLogManager feedLogManager) - throws HyracksDataException { - ITupleForwarder.TupleForwardPolicy policyType = null; - String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY); - if (ExternalDataUtils.isFeed(configuration)) { - // TODO pass this value in the configuration and avoid this check for feeds - policyType = TupleForwardPolicy.FEED; - } else if (propValue == null) { - policyType = TupleForwardPolicy.FRAME_FULL; - } else { - policyType = TupleForwardPolicy.valueOf(propValue.trim().toUpperCase()); - } - switch (policyType) { - case FEED: - return new FeedTupleForwarder(feedLogManager); - case FRAME_FULL: - return new FrameFullTupleForwarder(); - case COUNTER_TIMER_EXPIRED: - return CounterTimerTupleForwarder.create(configuration); - case RATE_CONTROLLED: - return RateControlledTupleForwarder.create(configuration); - default: - throw new RuntimeDataException(ErrorCode.UTIL_DATAFLOW_UTILS_UNKNOWN_FORWARD_POLICY); - } - } - public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer) throws HyracksDataException { if (!appender.append(tuple)) { @@ -88,31 +49,5 @@ throw new RuntimeDataException(ErrorCode.UTIL_DATAFLOW_UTILS_TUPLE_TOO_LARGE); } } - } - - /** - * Close the ITupleForwarder and suppress any Throwable thrown by the close call. - * This method must NEVER throw any Throwable - * - * @param indexHelper - * the indexHelper to close - * @param root - * the first exception encountered during release of resources - * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null - */ - public static Throwable close(ITupleForwarder tupleForwarder, Throwable root) { - if (tupleForwarder != null) { - try { - tupleForwarder.close(); - } catch (Throwable th) { // NOSONAR Will be re-thrown - try { - LOGGER.log(Level.WARN, "Failure closing a closeable resource", th); - } catch (Throwable ignore) { // NOSONAR Logging exception will be ignored - // NOSONAR ignore - } - root = ExceptionUtils.suppress(root, th); - } - } - return root; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java index 9d887b6..8bafb0f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java @@ -20,6 +20,7 @@ import java.io.BufferedReader; import java.io.BufferedWriter; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -35,7 +36,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; -public class FeedLogManager { +public class FeedLogManager implements Closeable { public enum LogEntryType { START, // partition start @@ -64,7 +65,7 @@ public FeedLogManager(File file) throws HyracksDataException { try { this.dir = file.toPath(); - this.completed = new TreeSet<String>(); + this.completed = new TreeSet<>(); if (!exists()) { create(); } @@ -122,6 +123,7 @@ StandardCharsets.UTF_8, StandardOpenOption.APPEND); } + @Override public synchronized void close() throws IOException { count--; if (count > 0) { diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index 6fe938c..8ee8a57 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -28,11 +28,8 @@ import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; -import org.apache.asterix.external.api.ITupleForwarder; +import org.apache.asterix.external.dataflow.TupleForwarder; import org.apache.asterix.external.parser.ADMDataParser; -import org.apache.asterix.external.util.DataflowUtils; -import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -79,26 +76,19 @@ @Override public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException { ADMDataParser parser; - ITupleForwarder forwarder; + ArrayTupleBuilder tb; IApplicationContext appCtx = (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); ClusterPartition nodePartition = appCtx.getMetadataProperties().getNodePartitions().get(nodeId)[0]; parser = new ADMDataParser(outputType, true); - forwarder = - DataflowUtils - .getTupleForwarder(configuration, - FeedUtils.getFeedLogManager(ctx, FeedUtils.splitsForAdapter( - ExternalDataUtils.getDataverse(configuration), - ExternalDataUtils.getFeedName(configuration), nodeId, nodePartition))); tb = new ArrayTupleBuilder(1); return new ITupleParser() { - @Override public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { try { parser.setInputStream(in); - forwarder.initialize(ctx, writer); + TupleForwarder forwarder = new TupleForwarder(ctx, writer); while (true) { tb.reset(); if (!parser.parse(tb.getDataOutput())) { @@ -107,7 +97,7 @@ tb.addFieldEndOffset(); forwarder.addTuple(tb); } - forwarder.close(); + forwarder.complete(); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java index bf54e01..a116320 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java @@ -107,11 +107,11 @@ @Override public void close() throws HyracksDataException { - writer.close(); - downstreamOpen = false; if (downstreamFailed && !failCalledByUpstream) { throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL); } + writer.close(); + downstreamOpen = false; } public static IFrameWriter enforce(IFrameWriter writer) { -- To view, visit https://asterix-gerrit.ics.uci.edu/2463 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ife679fb9643dc6b39d035e0eecdb915b227503a5 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]>
