http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 5b9b96f..e24c26d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -20,6 +20,8 @@ package org.apache.asterix.external.dataflow; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; @@ -32,16 +34,20 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController { + public enum State { + CREATED, + STARTED, + STOPPED + } + private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName()); private final IRecordDataParser<T> dataParser; private final IRecordReader<T> recordReader; protected final AtomicBoolean closed = new AtomicBoolean(false); protected static final long INTERVAL = 1000; - protected boolean failed = false; + protected State state = State.CREATED; protected long incomingRecordsCount = 0; protected long failedRecordsCount = 0; @@ -57,8 +63,15 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl @Override public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException { + synchronized (this) { + if (state == State.STOPPED) { + return; + } else { + setState(State.STARTED); + } + } + Exception failure = null; try { - failed = false; tupleForwarder.initialize(ctx, writer); while (hasNext()) { IRawRecord<? extends T> record = next(); @@ -74,21 +87,48 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } } catch (HyracksDataException e) { - LOGGER.log(Level.WARN, e); + LOGGER.log(Level.WARNING, "Exception during ingestion", e); //if interrupted while waiting for a new record, then it is safe to not fail forward if (e.getComponent() == ErrorCode.ASTERIX - && e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD) { - // Do nothing + && (e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) { + // Do nothing. interrupted by the active manager + } else if (e.getComponent() == ErrorCode.ASTERIX + && (e.getErrorCode() == ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) { + // Failure but we know we can for sure push the previously parsed records safely + failure = e; + try { + flush(); + } catch (Exception flushException) { + tupleForwarder.fail(); + flushException.addSuppressed(e); + failure = flushException; + } } else { - failed = true; - throw e; + failure = e; + tupleForwarder.fail(); } } catch (Exception e) { - failed = true; - LOGGER.warn("Failure while operating a feed source", e); - throw HyracksDataException.create(e); + failure = e; + tupleForwarder.fail(); + LOGGER.log(Level.WARNING, "Failure while operating a feed source", e); + } finally { + failure = finish(failure); + } + if (failure != null) { + if (failure instanceof InterruptedException) { + throw (InterruptedException) failure; + } + throw HyracksDataException.create(failure); } - finish(); + } + + private synchronized void setState(State newState) { + LOGGER.log(Level.INFO, "State is being set from " + state + " to " + newState); + state = newState; + } + + public synchronized State getState() { + return state; } private IRawRecord<? extends T> next() throws HyracksDataException { @@ -97,47 +137,58 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); } catch (Exception e) { - throw HyracksDataException.create(e); + if (!recordReader.handleException(e)) { + throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e); + } + return null; } } private boolean hasNext() throws HyracksDataException { - boolean hasNext; - try { - hasNext = recordReader.hasNext(); - } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline - throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); - } catch (Exception e) { - throw HyracksDataException.create(e); + while (true) { + try { + return recordReader.hasNext(); + } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); + } catch (Exception e) { + if (!recordReader.handleException(e)) { + throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e); + } + } } - return hasNext; } - private void finish() throws HyracksDataException { + private Exception finish(Exception failure) { HyracksDataException hde = null; try { - tupleForwarder.close(); - } catch (Throwable th) { + recordReader.close(); + } catch (Exception th) { + LOGGER.log(Level.WARNING, "Failure during while operating a feed source", th); hde = HyracksDataException.suppress(hde, th); } try { - recordReader.close(); - } catch (Throwable th) { - LOGGER.warn("Failure during while operating a feed sourcec", th); + tupleForwarder.close(); + } catch (Exception th) { hde = HyracksDataException.suppress(hde, th); } finally { closeSignal(); } + setState(State.STOPPED); if (hde != null) { - throw hde; + if (failure != null) { + failure.addSuppressed(hde); + } else { + return hde; + } } + return failure; } private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException { try { dataParser.parse(record, tb.getDataOutput()); } catch (Exception e) { - LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e); + LOGGER.log(Level.WARNING, ExternalDataConstants.ERROR_PARSE_RECORD, e); feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD); // continue the outer loop return false; @@ -172,44 +223,31 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl @Override public boolean stop() throws HyracksDataException { - HyracksDataException hde = null; + synchronized (this) { + switch (state) { + case CREATED: + case STOPPED: + setState(State.STOPPED); + return true; + case STARTED: + break; + default: + throw new HyracksDataException("unknown state " + state); + + } + } if (recordReader.stop()) { - if (failed) { - // failed, close here - try { - tupleForwarder.close(); - } catch (Throwable th) { - hde = HyracksDataException.suppress(hde, th); - } - try { - recordReader.close(); - } catch (Throwable th) { - hde = HyracksDataException.suppress(hde, th); - } - if (hde != null) { - throw hde; - } - } else { - try { - waitForSignal(); - } catch (InterruptedException e) { - throw HyracksDataException.create(e); - } + try { + waitForSignal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } return true; } return false; } - @Override - public boolean handleException(Throwable th) throws HyracksDataException { - // This is not a parser record. most likely, this error happened in the record reader. - if (!recordReader.handleException(th)) { - finish(); - } - return !closed.get(); - } - public IRecordReader<T> getReader() { return recordReader; }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java index cad11cd..1f1f545 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java @@ -43,8 +43,7 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController try { tupleForwarder.initialize(ctx, writer); while (true) { - tb.reset(); - if (!dataParser.parse(tb.getDataOutput())) { + if (!parseNext()) { break; } tb.addFieldEndOffset(); @@ -52,12 +51,25 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController incomingRecordsCount++; } } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } finally { tupleForwarder.close(); } } + private boolean parseNext() throws HyracksDataException { + while (true) { + try { + tb.reset(); + return dataParser.parse(tb.getDataOutput()); + } catch (Exception e) { + if (!handleException(e)) { + throw e; + } + } + } + } + @Override public boolean stop() throws HyracksDataException { try { @@ -71,8 +83,7 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController return false; } - @Override - public boolean handleException(Throwable th) { + private boolean handleException(Throwable th) { boolean handled = true; try { handled &= stream.handleException(th); @@ -86,6 +97,7 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController return handled; } + @Override public String getStats() { return "{\"incoming-records-number\": " + incomingRecordsCount + "}"; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java index 3a8130b..f824b67 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java @@ -103,7 +103,7 @@ public class FeedTupleForwarder implements ITupleForwarder { if (throwable != null) { throwable.addSuppressed(e); } else { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } catch (Throwable th) { if (throwable != null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java index 2b06775..c4f75e3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java @@ -32,8 +32,8 @@ public class IndexingDataFlowController<T> extends RecordDataFlowController<T> { private final IExternalIndexer indexer; public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder, - IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, - IExternalIndexer indexer) throws IOException { + IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, IExternalIndexer indexer) + throws IOException { super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields()); this.indexer = indexer; } @@ -43,7 +43,7 @@ public class IndexingDataFlowController<T> extends RecordDataFlowController<T> { try { indexer.index(tb); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java index eb5527f..f34b77d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java @@ -71,7 +71,8 @@ public class RateControlledTupleForwarder implements ITupleForwarder { try { Thread.sleep(interTupleInterval); } catch (InterruptedException e) { - throw new HyracksDataException(e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } } boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index 9f32a25..eeda80c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -24,7 +24,6 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; public class FeedAdapter implements IDataSourceAdapter { - private static final long serialVersionUID = 1L; private final AbstractFeedDataFlowController controller; public FeedAdapter(AbstractFeedDataFlowController controller) { @@ -40,10 +39,6 @@ public class FeedAdapter implements IDataSourceAdapter { return controller.stop(); } - public boolean handleException(Throwable e) throws HyracksDataException { - return controller.handleException(e); - } - public boolean pause() throws HyracksDataException { return controller.pause(); } @@ -56,7 +51,7 @@ public class FeedAdapter implements IDataSourceAdapter { return controller.getStats(); } - public void fail() throws HyracksDataException { - controller.fail(); + public AbstractFeedDataFlowController getController() { + return controller; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java index 0681d71..916fe0a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java @@ -25,7 +25,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class GenericAdapter implements IDataSourceAdapter { - private static final long serialVersionUID = 1L; private final IDataFlowController controller; public GenericAdapter(IDataFlowController controller) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java deleted file mode 100644 index d102d0c..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.runtime; - -import org.apache.asterix.external.dataset.adapter.FeedAdapter; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.log4j.Logger; - -/** - * The class in charge of executing feed adapters. - */ -public class AdapterExecutor implements Runnable { - - private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName()); - - private final IFrameWriter writer; // A writer that sends frames to multiple receivers (that can - // increase or decrease at any time) - private final FeedAdapter adapter; // The adapter - private final AdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility --> - private int restartCount = 0; - - public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter, AdapterRuntimeManager adapterManager) { - this.writer = writer; - this.adapter = adapter; - this.adapterManager = adapterManager; - } - - @Override - public void run() { - // Start by getting the partition number from the manager - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Starting ingestion for partition:" + adapterManager.getPartition()); - } - boolean failed = false; - try { - failed = doRun(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - failed = true; - LOGGER.error("Unhandled Exception", e); - } finally { - // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying - // the runtime manager - adapterManager.setFailed(failed); - adapterManager.setDone(true); - synchronized (adapterManager) { - adapterManager.notifyAll(); - } - } - } - - private boolean doRun() throws HyracksDataException, InterruptedException { - boolean continueIngestion = true; - boolean failedIngestion = false; - while (continueIngestion) { - try { - // Start the adapter - adapter.start(adapterManager.getPartition(), writer); - // Adapter has completed execution - continueIngestion = false; - } catch (InterruptedException e) { - adapter.fail(); - throw e; - } catch (Exception e) { - LOGGER.error("Exception during feed ingestion ", e); - continueIngestion = adapter.handleException(e); - if (!continueIngestion) { - adapter.fail(); - } - failedIngestion = !continueIngestion; - restartCount++; - } - } - return failedIngestion; - } - - public String getStats() { - return "{\"adapter-stats\": " + adapter.getStats() + ", \"executor-restart-times\": " + restartCount + "}"; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java deleted file mode 100644 index 1b5eeac..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.runtime; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.dataset.adapter.FeedAdapter; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * This class manages the execution of an adapter within a feed - */ -public class AdapterRuntimeManager { - - private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName()); - - private final EntityId feedId; // (dataverse-feed) - - private final FeedAdapter feedAdapter; // The adapter - - private final AdapterExecutor adapterExecutor; // The executor for the adapter - - private final int partition; // The partition number - - private final IHyracksTaskContext ctx; - - private Future<?> execution; - - private boolean started = false; - private volatile boolean done = false; - private volatile boolean failed = false; - - public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter, - IFrameWriter writer, int partition) { - this.ctx = ctx; - this.feedId = entityId; - this.feedAdapter = feedAdapter; - this.partition = partition; - this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this); - } - - public void start() { - synchronized (adapterExecutor) { - started = true; - if (!done) { - execution = ctx.getExecutorService().submit(adapterExecutor); - } else { - LOGGER.log(Level.WARNING, "Someone stopped me before I even start. I will simply not start"); - } - } - } - - public void stop() throws HyracksDataException, InterruptedException { - synchronized (adapterExecutor) { - try { - if (started) { - try { - ctx.getExecutorService().submit(() -> { - if (feedAdapter.stop()) { - execution.get(); - } - return null; - }).get(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e); - throw e; - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e); - throw HyracksDataException.create(e); - } finally { - execution.cancel(true); - } - } else { - LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts"); - } - } finally { - done = true; - } - } - } - - public EntityId getFeedId() { - return feedId; - } - - @Override - public String toString() { - return feedId + "[" + partition + "]"; - } - - public FeedAdapter getFeedAdapter() { - return feedAdapter; - } - - public AdapterExecutor getAdapterExecutor() { - return adapterExecutor; - } - - public int getPartition() { - return partition; - } - - public boolean isFailed() { - return failed; - } - - public void setFailed(boolean failed) { - this.failed = failed; - } - - public boolean isDone() { - return done; - } - - public void setDone(boolean done) { - this.done = done; - } - - public String getStats() { - return adapterExecutor.getStats(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index cbf784e..982cf5b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -137,7 +137,7 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd restoreConfig(ctx); return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java index 00ac090..c5ca129 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java @@ -21,16 +21,17 @@ package org.apache.asterix.external.input.stream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; import org.apache.asterix.external.util.FileSystemWatcher; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.log4j.Logger; public class LocalFSInputStream extends AsterixInputStream { @@ -155,24 +156,25 @@ public class LocalFSInputStream extends AsterixInputStream { if (in == null) { return false; } - if (th instanceof HyracksDataException - && ((HyracksDataException) th).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) { + Throwable root = ExceptionUtils.getRootCause(th); + if (root instanceof HyracksDataException + && ((HyracksDataException) root).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) { if (currentFile != null) { try { logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file"); } catch (IOException e) { - LOGGER.warn("Filed to write to feed log file", e); + LOGGER.log(Level.WARNING, "Filed to write to feed log file", e); } - LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath()); + LOGGER.log(Level.WARNING, "Corrupted input file: " + currentFile.getAbsolutePath()); } try { advance(); return true; } catch (Exception e) { - LOGGER.warn("An exception was thrown while trying to skip a file", e); + LOGGER.log(Level.WARNING, "An exception was thrown while trying to skip a file", e); } } - LOGGER.warn("Failed to recover from failure", th); + LOGGER.log(Level.WARNING, "Failed to recover from failure", th); return false; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java index 9a0e718..caeaa07 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java @@ -84,7 +84,7 @@ public class SocketClientInputStreamFactory implements IInputStreamFactory { try { return new SocketClientInputStream(sockets.get(partition)); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java index 05931b2..1f1fa5c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java @@ -124,7 +124,7 @@ public class SocketServerInputStreamFactory implements IInputStreamFactory { server.bind(new InetSocketAddress(socket.second)); return new SocketServerInputStream(server); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java index b32006c..12be449 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java @@ -97,7 +97,7 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory { try { return new TwitterFirehoseInputStream(configuration, partition); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java index 0d485924..2b5e248 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java @@ -23,6 +23,8 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.LinkedHashMap; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -95,6 +97,11 @@ import org.apache.hyracks.util.string.UTF8StringReader; public class JObjectAccessors { + private static final Logger LOGGER = Logger.getLogger(JObjectAccessors.class.getName()); + + private JObjectAccessors() { + } + public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) { IJObjectAccessor accessor = null; switch (aTypeTag) { @@ -200,18 +207,16 @@ public class JObjectAccessors { @Override public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objPool) throws HyracksDataException { - IJObject jObject = objPool.allocate(BuiltinType.ANULL); - return jObject; + return objPool.allocate(BuiltinType.ANULL); } } - public static class JMissingAccessor implements IJObjectAccessor { + public static class JMissingAccessor implements IJObjectAccessor { @Override public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objPool) throws HyracksDataException { - IJObject jObject = objPool.allocate(BuiltinType.AMISSING); - return jObject; + return objPool.allocate(BuiltinType.AMISSING); } } @@ -271,7 +276,7 @@ public class JObjectAccessors { try { v = reader.readUTF(new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1))); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } JObjectUtil.getNormalizedString(v); @@ -539,8 +544,8 @@ public class JObjectAccessors { } } catch (Exception e) { - e.printStackTrace(); - throw new HyracksDataException(e); + LOGGER.log(Level.WARNING, "Failure while accessing a java record", e); + throw HyracksDataException.create(e); } return jRecord; } @@ -593,7 +598,7 @@ public class JObjectAccessors { list.add(listItem); } } catch (AsterixException exception) { - throw new HyracksDataException(exception); + throw HyracksDataException.create(exception); } return list; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java index 45d424e..242773e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java @@ -89,7 +89,7 @@ public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOp try { bulkLoader.end(); } catch (Throwable th) { - throw new HyracksDataException(th); + throw HyracksDataException.create(th); } finally { try { indexHelper.close(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java index 6299982..c096f69 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java @@ -89,18 +89,21 @@ public class ExternalLookupOperatorDescriptor extends AbstractSingleActivityOper try { snapshotAccessor.close(); } catch (Throwable th) { - hde = new HyracksDataException(th); + hde = HyracksDataException.create(th); } try { adapter.close(); } catch (Throwable th) { if (hde == null) { - hde = new HyracksDataException(th); + hde = HyracksDataException.create(th); } else { hde.addSuppressed(th); } } } + if (hde != null) { + throw hde; + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 3a06a2b..770e978 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -96,8 +96,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator if (adaptorFactory == null) { adaptorFactory = createExternalAdapterFactory(ctx); } - return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, policyAccessor, - recordDescProvider, this); + return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, recordDescProvider, this); } private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException { @@ -112,7 +111,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator adapterFactory.setOutputType(adapterOutputType); adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } else { RuntimeDataException err = new RuntimeDataException( http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 8c6a420..16b8fba 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -18,15 +18,14 @@ */ package org.apache.asterix.external.operators; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActiveSourceOperatorNodePushable; import org.apache.asterix.active.EntityId; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.dataset.adapter.FeedAdapter; -import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -42,74 +41,97 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil; * The artifacts are lazily activated when a feed receives a subscription request. */ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable { - - private final int partition; - private final IAdapterFactory adapterFactory; + private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName()); private final FeedIntakeOperatorDescriptor opDesc; - private volatile AdapterRuntimeManager adapterRuntimeManager; + private final FeedAdapter adapter; + private boolean poisoned = false; public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory, - int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider, - FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) { + int partition, IRecordDescriptorProvider recordDescProvider, + FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException { super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); this.opDesc = feedIntakeOperatorDescriptor; this.recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0); - this.partition = partition; - this.adapterFactory = adapterFactory; + adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, runtimeId.getPartition()); } @Override protected void start() throws HyracksDataException, InterruptedException { + String before = Thread.currentThread().getName(); + Thread.currentThread().setName("Intake Thread"); try { writer.open(); - Thread.currentThread().setName("Intake Thread"); - FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition); - adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition); - IFrame message = new VSizeFrame(ctx); - TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx); + synchronized (this) { + if (poisoned) { + return; + } + } /* * Set null feed message. Feed pipeline carries with it a message with each frame * Initially, the message is set to a null message that can be changed by feed adapters. * One use case is adapters which consume data sources that allow restartability. Such adapters * can propagate progress information through the ingestion pipeline to storage nodes */ + IFrame message = new VSizeFrame(ctx); + TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx); message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); message.getBuffer().flip(); - adapterRuntimeManager.start(); - synchronized (adapterRuntimeManager) { - while (!adapterRuntimeManager.isDone()) { - adapterRuntimeManager.wait(); - } - } - if (adapterRuntimeManager.isFailed()) { - throw new RuntimeDataException( - ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION); - } + run(); } catch (Exception e) { - /* - * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another - * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is - * indicative of a failure at the sibling intake operator location. The surviving intake partitions must - * continue to live and receive data from the external source. - */ - writer.fail(); + LOGGER.log(Level.WARNING, "Failure during data ingestion", e); throw e; } finally { writer.close(); + Thread.currentThread().setName(before); + } + } + + private void run() throws HyracksDataException { + // Start by getting the partition number from the manager + LOGGER.info("Starting ingestion for partition:" + ctx.getTaskAttemptId().getTaskId().getPartition()); + try { + doRun(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Unhandled Exception", e); + throw HyracksDataException.create(e); + } + } + + private void doRun() throws HyracksDataException, InterruptedException { + while (true) { + try { + // Start the adapter + adapter.start(ctx.getTaskAttemptId().getTaskId().getPartition(), writer); + // Adapter has completed execution + return; + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception during feed ingestion ", e); + throw HyracksDataException.create(e); + } } } @Override protected void abort() throws HyracksDataException, InterruptedException { - if (adapterRuntimeManager != null) { - adapterRuntimeManager.stop(); + LOGGER.info(runtimeId + " aborting..."); + synchronized (this) { + poisoned = true; + if (!adapter.stop()) { + LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread..."); + taskThread.interrupt(); + } } } @Override public String getStats() { - if (adapterRuntimeManager != null) { - return adapterRuntimeManager.getStats(); + if (adapter != null) { + return "{\"adapter-stats\": " + adapter.getStats() + "}"; } else { return "\"Runtime stats is not available.\""; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java index e965dce..5fc9df3 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java @@ -1849,7 +1849,7 @@ public class BuiltinClassAdFunctions { result.setBooleanValue(false); List<String> list0 = objectPool.stringArrayListPool.get(); - Set<String> set1 = new HashSet<String>(); + Set<String> set1 = new HashSet<>(); split_string_list(str0, have_delimiter ? delimiter_string.charAt(0) : ',', list0); split_string_set(str1, have_delimiter ? delimiter_string.charAt(0) : ',', set1); @@ -1943,7 +1943,7 @@ public class BuiltinClassAdFunctions { return true; } } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } state.decrementDepth(); expr.setParentScope(state.getCurAd()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java index c362969..2273bea 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java @@ -39,8 +39,6 @@ import org.apache.hyracks.dataflow.std.file.ITupleParserFactory; public class TestTypedAdapter extends FeedAdapter { - private static final long serialVersionUID = 1L; - private final PipedOutputStream pos; private final PipedInputStream pis; @@ -145,11 +143,6 @@ public class TestTypedAdapter extends FeedAdapter { } @Override - public boolean handleException(Throwable e) { - return false; - } - - @Override public boolean pause() { return false; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index 1c28940..5262e1f 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -106,7 +106,7 @@ public class TestTypedAdapterFactory implements IAdapterFactory { } forwarder.close(); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } }; @@ -115,7 +115,7 @@ public class TestTypedAdapterFactory implements IAdapterFactory { try { return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java index f64206e..f15b1e5 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java @@ -95,14 +95,14 @@ public abstract class AbstractListBuilder implements IAsterixListBuilder { this.outputStream.write(data, start + 1, len - 1); } } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } private boolean toWriteTag(byte serializedTypeTag) { boolean toWriteTag = itemTypeTag == ATypeTag.ANY; - toWriteTag = toWriteTag - || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG); + toWriteTag = + toWriteTag || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG); return toWriteTag || (itemTypeTag == ATypeTag.MISSING && serializedTypeTag == ATypeTag.SERIALIZED_MISSING_TYPE_TAG); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java index 111557a..5df04f8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java @@ -118,17 +118,17 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc private final RecordBuilder recordBuilder = new RecordBuilder(); private final RuntimeRecordTypeInfo requiredRecordTypeInfo = new RuntimeRecordTypeInfo(); - private final IBinaryHashFunction putHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE - .createBinaryHashFunction(); - private final IBinaryHashFunction getHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE - .createBinaryHashFunction(); + private final IBinaryHashFunction putHashFunc = + ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction(); + private final IBinaryHashFunction getHashFunc = + ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction(); private final BinaryEntry keyEntry = new BinaryEntry(); private final BinaryEntry valEntry = new BinaryEntry(); private final IVisitablePointable tempValReference = allocator.allocateEmpty(); - private final IBinaryComparator cmp = ListItemBinaryComparatorFactory.INSTANCE - .createBinaryComparator(); - private BinaryHashMap hashMap = new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc, - getHashFunc, cmp); + private final IBinaryComparator cmp = + ListItemBinaryComparatorFactory.INSTANCE.createBinaryComparator(); + private BinaryHashMap hashMap = + new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc, getHashFunc, cmp); private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); private DataOutput out = resultStorage.getDataOutput(); @@ -157,7 +157,6 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc vp0.set(argPtr0); vp1.set(argPtr1); - ARecordVisitablePointable recordPointable = (ARecordVisitablePointable) vp0; AListVisitablePointable listPointable = (AListVisitablePointable) vp1; @@ -207,10 +206,10 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc throw new AsterixException("Expected list of record, got " + PointableHelper.getTypeTag(inputFields.get(i))); } - List<IVisitablePointable> names = ((ARecordVisitablePointable) inputFields.get(i)) - .getFieldNames(); - List<IVisitablePointable> values = ((ARecordVisitablePointable) inputFields.get(i)) - .getFieldValues(); + List<IVisitablePointable> names = + ((ARecordVisitablePointable) inputFields.get(i)).getFieldNames(); + List<IVisitablePointable> values = + ((ARecordVisitablePointable) inputFields.get(i)).getFieldValues(); // Get name and value of the field to be added // Use loop to account for the cases where users switches the order of the fields @@ -241,8 +240,7 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc tempValReference.set(entry.getBuf(), entry.getOffset(), entry.getLength()); // If value is not equal throw conflicting duplicate field, otherwise ignore if (!PointableHelper.byteArrayEqual(valuePointable, tempValReference)) { - throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, - getIdentifier()); + throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier()); } } else { if (pos > -1) { @@ -256,7 +254,7 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc } } } catch (AsterixException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } }; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java index 3957c06..f2deb74 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java @@ -72,8 +72,8 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat @Override public void close() throws HyracksDataException { try { - INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext() - .getServiceContext().getApplicationContext(); + INcApplicationContext appCtx = + (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); IDatasetLifecycleManager datasetLifeCycleManager = appCtx.getDatasetLifecycleManager(); ILockManager lockManager = appCtx.getTransactionSubsystem().getLockManager(); ITransactionManager txnManager = appCtx.getTransactionSubsystem().getTransactionManager(); @@ -84,7 +84,7 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat // flush the dataset synchronously datasetLifeCycleManager.flushDataset(datasetId.getId(), false); } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java index b835b3a..b7a4c14 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java @@ -65,12 +65,12 @@ public class GlobalResourceIdFactory implements IResourceIdFactory { ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg); reponse = resourceIdResponseQ.take(); if (reponse.getException() != null) { - throw new HyracksDataException(reponse.getException().getMessage()); + throw HyracksDataException.create(reponse.getException()); } } return reponse.getResourceId(); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index ab7d657..b22a257 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -395,10 +395,11 @@ public class ClusterStateManager implements IClusterStateManager { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering intention to remove node id " + nodeId); } - if (!activeNcConfiguration.containsKey(nodeId)) { + if (activeNcConfiguration.containsKey(nodeId)) { + pendingRemoval.add(nodeId); + } else { LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal"); } - pendingRemoval.add(nodeId); } public synchronized boolean cancelRemovePending(String nodeId) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java index 367616e..f76cb89 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java @@ -46,8 +46,8 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i private int pkHash; public LockThenSearchOperationCallback(DatasetId datasetId, int[] entityIdFields, - ITransactionSubsystem txnSubsystem, - ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) { + ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx, + IOperatorNodePushable operatorNodePushable) { super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager()); this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable; this.logManager = txnSubsystem.getLogManager(); @@ -118,7 +118,7 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx); } } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java index e8de90d..b13a08e 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java @@ -56,7 +56,7 @@ public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperatio try { lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx); } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -70,7 +70,7 @@ public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperatio try { lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx); } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -80,7 +80,7 @@ public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperatio try { lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx); } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java index 9e96fbb..a6cb61c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java @@ -89,7 +89,7 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx); } } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -99,7 +99,7 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields); log(pkHash, after, before); } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index 5527f47..932c925 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -74,7 +74,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true); return modCallback; } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 8c5b099..b339d27 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -70,7 +70,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false); return modCallback; } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java index a27f987..f897aca 100644 --- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java +++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java @@ -20,11 +20,11 @@ package org.apache.asterix.transaction.management.service.locking; import static org.mockito.Mockito.mock; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.hyracks.api.io.IIOManager; @@ -35,11 +35,11 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; class TestRuntimeContextProvider implements IAppRuntimeContextProvider { - ThreadExecutor ate = new ThreadExecutor(Executors.defaultThreadFactory()); + ExecutorService ate = Executors.newCachedThreadPool(Executors.defaultThreadFactory()); IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class); @Override - public ThreadExecutor getThreadExecutor() { + public ExecutorService getThreadExecutor() { return ate; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java index 8394057..067579e 100644 --- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java +++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.algebricks.common.constraints; +import java.util.Arrays; + public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionConstraint { private final String[] locations; @@ -33,4 +35,10 @@ public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionCo public String[] getLocations() { return locations; } + + @Override + public String toString() { + return Arrays.toString(locations); + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java index 68274ce..2314f88 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java @@ -71,8 +71,8 @@ public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor @Override public void contributeActivities(IActivityGraphBuilder builder) { - SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode( - new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); + SplitterMaterializerActivityNode sma = + new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); builder.addActivity(this, sma); builder.addSourceEdge(0, sma, 0); for (int i = 0; i < outputArity; i++) { @@ -168,7 +168,7 @@ public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor writers[i].close(); } catch (Throwable th) { if (hde == null) { - hde = new HyracksDataException(th); + hde = HyracksDataException.create(th); } else { hde.addSuppressed(th); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java index 024f6f5..82f403e 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java @@ -137,8 +137,8 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream()); outputPipe = new Thread(fso); outputPipe.start(); - DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(), - System.err); + DumpInStreamToPrintStream disps = + new DumpInStreamToPrintStream(process.getErrorStream(), System.err); dumpStderr = new Thread(disps); dumpStderr.start(); } catch (IOException e) { @@ -174,7 +174,8 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt outputPipe.join(); dumpStderr.join(); } catch (InterruptedException e) { - throw new HyracksDataException(e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } if (ret != 0) { throw new HyracksDataException("Process exit value: " + ret); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java index f4c5114..0e70759 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java @@ -59,7 +59,7 @@ public class IoUtil { public static void delete(File file) throws HyracksDataException { try { if (file.isDirectory()) { - FileUtils.deleteDirectory(file); + deleteDirectory(file); } else { Files.delete(file.toPath()); } @@ -89,4 +89,39 @@ public class IoUtil { throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_FILE, e, fileRef.getAbsolutePath()); } } + + public static void deleteDirectory(File directory) throws IOException { + if (!directory.exists()) { + return; + } + if (!FileUtils.isSymlink(directory)) { + cleanDirectory(directory); + } + Files.delete(directory.toPath()); + } + + public static void cleanDirectory(final File directory) throws IOException { + final File[] files = verifiedListFiles(directory); + for (final File file : files) { + delete(file); + } + } + + private static File[] verifiedListFiles(File directory) throws IOException { + if (!directory.exists()) { + final String message = directory + " does not exist"; + throw new IllegalArgumentException(message); + } + + if (!directory.isDirectory()) { + final String message = directory + " is not a directory"; + throw new IllegalArgumentException(message); + } + + final File[] files = directory.listFiles(); + if (files == null) { // null if security restricted + throw new IOException("Failed to list contents of " + directory); + } + return files; + } }