abdullah alamoudi has submitted this change and it was merged. Change subject: Control Number of Readers for LocalFS Data ......................................................................
Control Number of Readers for LocalFS Data Change-Id: Ib9d5ece656220d5f562cc385f882c5ddfd3283a6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/776 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java M asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java M asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java M asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java A asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java M asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java M asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java M asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java M asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java M asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java M asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java M asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java M asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java M asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java 25 files changed, 450 insertions(+), 346 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java index b98618d..d8f1893 100644 --- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java +++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java @@ -20,6 +20,8 @@ import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.TreeSet; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.utils.StoragePathUtil; @@ -64,6 +66,7 @@ /** * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor. + * * @param primaryFeed * @param metadataProvider * @return JobSpecification the Hyracks job specification for receiving data from external source @@ -251,12 +254,18 @@ public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception { JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations(); + AlgebricksAbsolutePartitionConstraint allCluster = AsterixClusterProperties.INSTANCE.getClusterLocations(); + Set<String> nodes = new TreeSet<>(); + for (String node : allCluster.getLocations()) { + nodes.add(node); + } + AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint( + nodes.toArray(new String[nodes.size()])); FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil .splitProviderAndPartitionConstraints(feedLogFileSplits); - FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first); + FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second); spec.addRoot(frod); return spec; diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java index c77ca10..d5765f1 100644 --- a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java +++ b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java @@ -32,7 +32,7 @@ JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName()); - FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first); + FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second); jobSpec.addRoot(frod); return jobSpec; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 041f706..d3abd50 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -39,6 +39,7 @@ import org.apache.asterix.external.util.ExternalDataCompatibilityUtils; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.FeedLogManager; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -58,6 +59,7 @@ private boolean isFeed; private FileSplit[] feedLogFileSplits; private ARecordType metaType; + private FeedLogManager feedLogManager = null; @Override public void setSnapshot(List<ExternalFile> files, boolean indexingOp) { @@ -86,8 +88,14 @@ } catch (AsterixException e) { throw new HyracksDataException(e); } + if (isFeed) { + if (feedLogManager == null) { + feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits); + } + feedLogManager.touch(); + } IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition, - dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits); + dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager); if (isFeed) { return new FeedAdapter((AbstractFeedDataFlowController) controller); } else { diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java index 83d7a3a..a4c2fae 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java @@ -22,11 +22,13 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class AsterixInputStream extends InputStream { protected AbstractFeedDataFlowController controller; protected FeedLogManager logManager; + protected IStreamNotificationHandler notificationHandler; public abstract boolean stop() throws Exception; @@ -38,7 +40,11 @@ } // TODO: Find a better way to send notifications - public void setFeedLogManager(FeedLogManager logManager) { + public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException { this.logManager = logManager; } + + public void setNotificationHandler(IStreamNotificationHandler notificationHandler) { + this.notificationHandler = notificationHandler; + } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java index 11e2472..9cce1c9 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java @@ -23,9 +23,11 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * This interface represents a record reader that reads data from external source as a set of records + * * @param <T> */ public interface IRecordReader<T> extends Closeable { @@ -33,7 +35,7 @@ /** * @return true if the reader has more records remaining, false, otherwise. * @throws Exception - * if an error takes place + * if an error takes place */ public boolean hasNext() throws Exception; @@ -46,6 +48,7 @@ /** * used to stop reader from producing more records. + * * @return true if the connection to the external source has been suspended, false otherwise. */ public boolean stop(); @@ -61,8 +64,10 @@ /** * set a pointer to the log manager of the feed. the log manager can be used to log * progress and errors + * + * @throws HyracksDataException */ - public void setFeedLogManager(FeedLogManager feedLogManager); + public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException; /** * gives the record reader a chance to recover from IO errors during feed intake diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java new file mode 100644 index 0000000..8b014ad --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.api; + +public interface IStreamNotificationHandler { + + /** + * Used to notify a handler that the stream is about to start reading data from a new source. + * An example use is by the parser to skip CSV file headers in case the stream reads from a set of files. + */ + public void notifyNewSource(); +} diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java index 8ec422f..a301ac9 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java @@ -25,6 +25,7 @@ import org.apache.asterix.external.api.IRecordWithPKDataParser; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> { @@ -33,7 +34,8 @@ public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder, final FeedLogManager feedLogManager, final int numOfOutputFields, - final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) { + final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) + throws HyracksDataException { super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader); this.dataParser = dataParser; } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java index 370eec0..aac7be2 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java @@ -25,13 +25,15 @@ import org.apache.asterix.external.parser.RecordWithMetadataParser; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> { public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder, final FeedLogManager feedLogManager, final int numOfOutputFields, - final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) { + final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) + throws HyracksDataException { super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 387e2dc..a092620 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -45,7 +45,7 @@ public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser, - @Nonnull IRecordReader<T> recordReader) { + @Nonnull IRecordReader<T> recordReader) throws HyracksDataException { super(ctx, tupleForwarder, feedLogManager, numOfOutputFields); this.dataParser = dataParser; this.recordReader = recordReader; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java index 203b5a7..e7c396b 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java @@ -25,6 +25,7 @@ import org.apache.asterix.external.parser.RecordWithMetadataParser; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> { @@ -34,7 +35,7 @@ public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser, - IRecordReader<T> recordReader) { + IRecordReader<T> recordReader) throws HyracksDataException { super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader); this.dataParser = dataParser; } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java index 2c2dd98..6eee892 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java @@ -31,6 +31,7 @@ import org.apache.asterix.external.util.FeedLogManager; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class IndexingStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource { @@ -73,7 +74,7 @@ } @Override - public void setFeedLogManager(FeedLogManager feedLogManager) { + public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException { reader.setFeedLogManager(feedLogManager); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java index 8572fc7..59b72e4 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java @@ -26,87 +26,96 @@ public class LineRecordReader extends StreamRecordReader { + private final boolean hasHeader; protected boolean prevCharCR; protected int newlineLength; protected int recordNumber = 0; + protected boolean nextIsHeader = false; public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException { super(stream); - try { - if (hasHeader) { - if (hasNext()) { - next(); - } - } - } catch (final IOException e) { - throw new HyracksDataException(e); + this.hasHeader = hasHeader; + if (hasHeader) { + stream.setNotificationHandler(this); } + } + @Override + public void notifyNewSource() { + if (hasHeader) { + nextIsHeader = true; + } } @Override public boolean hasNext() throws IOException { - if (done) { - return false; - } - /* - * We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to record. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to record, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - */ - newlineLength = 0; //length of terminating newline - prevCharCR = false; //true of prev char was CR - record.reset(); - int readLength = 0; - do { - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - bufferLength = reader.read(inputBuffer); - if (bufferLength <= 0) { - if (readLength > 0) { - record.endRecord(); - recordNumber++; - return true; + while (true) { + if (done) { + return false; + } + /* + * We're reading data from in, but the head of the stream may be + * already buffered in buffer, so we have several cases: + * 1. No newline characters are in the buffer, so we need to copy + * everything and read another buffer from the stream. + * 2. An unambiguously terminated line is in buffer, so we just + * copy to record. + * 3. Ambiguously terminated line is in buffer, i.e. buffer ends + * in CR. In this case we copy everything up to CR to record, but + * we also need to see what follows CR: if it's LF, then we + * need consume LF as well, so next call to readLine will read + * from after that. + * We use a flag prevCharCR to signal if previous character was CR + * and, if it happens to be at the end of the buffer, delay + * consuming it until we have a chance to look at the char that + * follows. + */ + newlineLength = 0; //length of terminating newline + prevCharCR = false; //true of prev char was CR + record.reset(); + int readLength = 0; + do { + int startPosn = bufferPosn; //starting from where we left off the last time + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + bufferLength = reader.read(inputBuffer); + if (bufferLength <= 0) { + if (readLength > 0) { + record.endRecord(); + recordNumber++; + return true; + } + close(); + return false; //EOF } - close(); - return false; //EOF } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; + for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline + if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; // at next invocation proceed from following byte + break; + } + if (prevCharCR) { //CR + notLF, we are at notLF + newlineLength = 1; + break; + } + prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; + readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; //CR at the end of the buffer + prevCharCR = false; } - prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); + if (readLength > 0) { + record.append(inputBuffer, startPosn, readLength); + } + } while (newlineLength == 0); + if (nextIsHeader) { + nextIsHeader = false; + continue; } - readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - prevCharCR = false; - } - if (readLength > 0) { - record.append(inputBuffer, startPosn, readLength); - } - } while (newlineLength == 0); - recordNumber++; - return true; + recordNumber++; + return true; + } } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java index 515e0e5..88964a1 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java @@ -44,76 +44,82 @@ @Override public boolean hasNext() throws IOException { - if (done) { - return false; - } - newlineLength = 0; - prevCharCR = false; - prevCharEscape = false; - record.reset(); - int readLength = 0; - inQuote = false; - do { - int startPosn = bufferPosn; - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - bufferLength = reader.read(inputBuffer); - if (bufferLength <= 0) { - { - if (readLength > 0) { - if (inQuote) { - throw new IOException("malformed input record ended inside quote"); + while (true) { + if (done) { + return false; + } + newlineLength = 0; + prevCharCR = false; + prevCharEscape = false; + record.reset(); + int readLength = 0; + inQuote = false; + do { + int startPosn = bufferPosn; + if (bufferPosn >= bufferLength) { + startPosn = bufferPosn = 0; + bufferLength = reader.read(inputBuffer); + if (bufferLength <= 0) { + { + if (readLength > 0) { + if (inQuote) { + throw new IOException("malformed input record ended inside quote"); + } + record.endRecord(); + recordNumber++; + return true; } - record.endRecord(); - recordNumber++; - return true; + close(); + return false; } - close(); - return false; } } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { - if (!inQuote) { - if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; - break; - } - if (prevCharCR) { - newlineLength = 1; - break; - } - prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); - if (inputBuffer[bufferPosn] == quote) { - if (!prevCharEscape) { - inQuote = true; + for (; bufferPosn < bufferLength; ++bufferPosn) { + if (!inQuote) { + if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { + newlineLength = (prevCharCR) ? 2 : 1; + ++bufferPosn; + break; } - } - if (prevCharEscape) { - prevCharEscape = false; + if (prevCharCR) { + newlineLength = 1; + break; + } + prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); + if (inputBuffer[bufferPosn] == quote) { + if (!prevCharEscape) { + inQuote = true; + } + } + if (prevCharEscape) { + prevCharEscape = false; + } else { + prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; + } } else { + // only look for next quote + if (inputBuffer[bufferPosn] == quote) { + if (!prevCharEscape) { + inQuote = false; + } + } prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; } - } else { - // only look for next quote - if (inputBuffer[bufferPosn] == quote) { - if (!prevCharEscape) { - inQuote = false; - } - } - prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE; } + readLength = bufferPosn - startPosn; + if (prevCharCR && newlineLength == 0) { + --readLength; + } + if (readLength > 0) { + record.append(inputBuffer, startPosn, readLength); + } + } while (newlineLength == 0); + if (nextIsHeader) { + nextIsHeader = false; + continue; } - readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; - } - if (readLength > 0) { - record.append(inputBuffer, startPosn, readLength); - } - } while (newlineLength == 0); - recordNumber++; - return true; + recordNumber++; + return true; + } } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java index 57ef3ae..7dc5bce 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java @@ -23,13 +23,16 @@ import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.api.IStreamNotificationHandler; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.CharArrayRecord; import org.apache.asterix.external.input.stream.AsterixInputStreamReader; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hyracks.api.exceptions.HyracksDataException; -public abstract class StreamRecordReader implements IRecordReader<char[]> { +public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler { protected final AsterixInputStreamReader reader; protected CharArrayRecord record; protected char[] inputBuffer; @@ -37,6 +40,7 @@ protected int bufferPosn = 0; protected boolean done = false; protected FeedLogManager feedLogManager; + protected MutableBoolean newFile = new MutableBoolean(false); public StreamRecordReader(AsterixInputStream inputStream) { this.reader = new AsterixInputStreamReader(inputStream); @@ -72,7 +76,8 @@ public abstract boolean hasNext() throws IOException; @Override - public void setFeedLogManager(FeedLogManager feedLogManager) { + public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException { + this.feedLogManager = feedLogManager; reader.setFeedLogManager(feedLogManager); } @@ -85,4 +90,9 @@ public boolean handleException(Throwable th) { return reader.handleException(th); } + + @Override + public void notifyNewSource() { + throw new UnsupportedOperationException(); + } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java index 7e280a5..94333d1 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java @@ -29,6 +29,7 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class AsterixInputStreamReader extends Reader { private AsterixInputStream in; @@ -56,7 +57,7 @@ in.setController(controller); } - public void setFeedLogManager(FeedLogManager feedLogManager) { + public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException { in.setFeedLogManager(feedLogManager); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java index 00c1eb7..2519177 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java @@ -21,47 +21,37 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.Path; -import java.util.Map; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; import org.apache.asterix.external.util.FileSystemWatcher; -import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.file.FileSplit; import org.apache.log4j.Logger; public class LocalFSInputStream extends AsterixInputStream { private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName()); - private final Path path; private final FileSystemWatcher watcher; private FileInputStream in; private byte lastByte; private File currentFile; - public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx, - final Map<String, String> configuration, final int partition, final String expression, final boolean isFeed) - throws IOException { - this.path = fileSplits[partition].getLocalFile().getFile().toPath(); - this.watcher = new FileSystemWatcher(path, expression, isFeed); - this.watcher.init(); - } - - @Override - public void setFeedLogManager(FeedLogManager logManager) { - super.setFeedLogManager(logManager); - watcher.setFeedLogManager(logManager); + public LocalFSInputStream(FileSystemWatcher watcher) throws IOException { + this.watcher = watcher; } @Override public void setController(AbstractFeedDataFlowController controller) { super.setController(controller); - watcher.setController(controller); } + + @Override + public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException { + super.setFeedLogManager(logManager); + watcher.setFeedLogManager(logManager); + }; @Override public void close() throws IOException { @@ -86,6 +76,9 @@ private void closeFile() throws IOException { if (in != null) { + if (logManager != null) { + logManager.endPartition(currentFile.getAbsolutePath()); + } try { in.close(); } finally { @@ -100,9 +93,18 @@ */ private boolean advance() throws IOException { closeFile(); - if (watcher.hasNext()) { - currentFile = watcher.next(); + currentFile = watcher.poll(); + if (currentFile == null) { + if (controller != null) { + controller.flush(); + } + currentFile = watcher.take(); + } + if (currentFile != null) { in = new FileInputStream(currentFile); + if (notificationHandler != null) { + notificationHandler.notifyNewSource(); + } return true; } return false; @@ -141,6 +143,7 @@ @Override public boolean stop() throws Exception { + closeFile(); watcher.close(); return true; } @@ -165,18 +168,11 @@ advance(); return true; } catch (Exception e) { - return false; - } - } else { - try { - watcher.init(); - } catch (IOException e) { - LOGGER.warn("Failed to initialize watcher during failure recovery", e); - return false; + LOGGER.warn("An exception was thrown while trying to skip a file", e); } } - return true; } + LOGGER.warn("Failed to recover from failure", th); return false; } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java index 85d0e41..08fce87 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java @@ -20,10 +20,14 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IInputStreamFactory; @@ -32,8 +36,9 @@ import org.apache.asterix.external.input.stream.LocalFSInputStream; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.asterix.external.util.FeedUtils; +import org.apache.asterix.external.util.FileSystemWatcher; import org.apache.asterix.external.util.NodeResolverFactory; +import org.apache.asterix.om.util.AsterixAppContextInfo; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -49,16 +54,27 @@ protected static INodeResolver nodeResolver; protected Map<String, String> configuration; protected FileSplit[] inputFileSplits; - protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage protected boolean isFeed; protected String expression; // transient fields (They don't need to be serialized and transferred) private transient AlgebricksAbsolutePartitionConstraint constraints; + private transient FileSystemWatcher watcher; @Override - public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { + public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + if (watcher == null) { + String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId(); + ArrayList<Path> inputResources = new ArrayList<>(); + for (int i = 0; i < inputFileSplits.length; i++) { + if (inputFileSplits[i].getNodeName().equals(nodeName)) { + inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath()); + } + } + watcher = new FileSystemWatcher(inputResources, expression, isFeed); + } try { - return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed); + return new LocalFSInputStream(watcher); } catch (IOException e) { throw new HyracksDataException(e); } @@ -81,10 +97,6 @@ configureFileSplits(splits); configurePartitionConstraint(); this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration); - if (isFeed) { - feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration), - ExternalDataUtils.getFeedName(configuration), constraints); - } this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION); } @@ -94,6 +106,7 @@ } private void configureFileSplits(String[] splits) throws AsterixException { + INodeResolver resolver = getNodeResolver(); if (inputFileSplits == null) { inputFileSplits = new FileSplit[splits.length]; String nodeName; @@ -106,7 +119,7 @@ throw new AsterixException( "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\""); } - nodeName = trimmedValue.split(":")[0]; + nodeName = resolver.resolveNode(trimmedValue.split(":")[0]); nodeLocalPath = trimmedValue.split("://")[1]; FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath))); inputFileSplits[count++] = fileSplit; @@ -115,13 +128,21 @@ } private void configurePartitionConstraint() throws AsterixException { - String[] locs = new String[inputFileSplits.length]; - String location; + Map<String, ClusterPartition[]> partitions = AsterixAppContextInfo.getInstance().getMetadataProperties() + .getNodePartitions(); + List<String> locs = new ArrayList<>(); for (int i = 0; i < inputFileSplits.length; i++) { - location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName()); - locs[i] = location; + String location = inputFileSplits[i].getNodeName(); + if (!locs.contains(location)) { + int numOfPartitions = partitions.get(location).length; + int j = 0; + while (j < numOfPartitions) { + locs.add(location); + j++; + } + } } - constraints = new AlgebricksAbsolutePartitionConstraint(locs); + constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()])); } protected INodeResolver getNodeResolver() { diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index d362201..6ba27d8 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -52,7 +52,6 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.file.FileSplit; public class DataflowControllerProvider { @@ -60,13 +59,9 @@ @SuppressWarnings({ "rawtypes", "unchecked" }) public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx, int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory, - Map<String, String> configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits) - throws HyracksDataException { + Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager) + throws HyracksDataException { try { - FeedLogManager feedLogManager = null; - if (isFeed) { - feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits); - } switch (dataSourceFactory.getDataSourceType()) { case RECORDS: IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java index fc15d3c..5bb8ec3 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java @@ -38,8 +38,7 @@ START, // partition start END, // partition end COMMIT, // a record commit within a partition - SNAPSHOT // an identifier that partitions with identifiers before this one should be - // ignored + SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored } public static final String PROGRESS_LOG_FILE_NAME = "progress.log"; @@ -55,6 +54,7 @@ private BufferedWriter errorLogger; private BufferedWriter recordLogger; private final StringBuilder stringBuilder = new StringBuilder(); + private int count = 0; public FeedLogManager(File file) throws HyracksDataException { try { @@ -69,18 +69,22 @@ } } - public void endPartition() throws IOException { + public synchronized void touch() { + count++; + } + + public synchronized void endPartition() throws IOException { logProgress(END_PREFIX + currentPartition); completed.add(currentPartition); } - public void endPartition(String partition) throws IOException { + public synchronized void endPartition(String partition) throws IOException { currentPartition = partition; logProgress(END_PREFIX + currentPartition); completed.add(currentPartition); } - public void startPartition(String partition) throws IOException { + public synchronized void startPartition(String partition) throws IOException { currentPartition = partition; logProgress(START_PREFIX + currentPartition); } @@ -89,7 +93,7 @@ return Files.exists(dir); } - public void open() throws IOException { + public synchronized void open() throws IOException { // read content of logs. BufferedReader reader = Files.newBufferedReader( Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME)); @@ -113,13 +117,17 @@ StandardCharsets.UTF_8, StandardOpenOption.APPEND); } - public void close() throws IOException { + public synchronized void close() throws IOException { + count--; + if (count > 0) { + return; + } progressLogger.close(); errorLogger.close(); recordLogger.close(); } - public boolean create() throws IOException { + public synchronized boolean create() throws IOException { File f = dir.toFile(); f.mkdirs(); new File(f, PROGRESS_LOG_FILE_NAME).createNewFile(); @@ -128,13 +136,13 @@ return true; } - public boolean destroy() throws IOException { + public synchronized boolean destroy() throws IOException { File f = dir.toFile(); FileUtils.deleteDirectory(f); return true; } - public void logProgress(String log) throws IOException { + public synchronized void logProgress(String log) throws IOException { stringBuilder.setLength(0); stringBuilder.append(log); stringBuilder.append(ExternalDataConstants.LF); @@ -142,7 +150,7 @@ progressLogger.flush(); } - public void logError(String error, Throwable th) throws IOException { + public synchronized void logError(String error, Throwable th) throws IOException { stringBuilder.setLength(0); stringBuilder.append(error); stringBuilder.append(ExternalDataConstants.LF); @@ -152,7 +160,7 @@ errorLogger.flush(); } - public void logRecord(String record, String errorMessage) throws IOException { + public synchronized void logRecord(String record, String errorMessage) throws IOException { stringBuilder.setLength(0); stringBuilder.append(record); stringBuilder.append(ExternalDataConstants.LF); @@ -166,7 +174,7 @@ return log.substring(PREFIX_SIZE); } - public boolean isSplitRead(String split) { + public synchronized boolean isSplitRead(String split) { return completed.contains(split); } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index 5ab41af..502a432 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -45,17 +45,16 @@ return dataverseName + File.separator + feedName; } - public static FileSplit splitsForAdapter(String dataverseName, String feedName, int partition, - ClusterPartition[] nodePartitions) { + public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName, + ClusterPartition partition) { File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); - ClusterPartition nodePartition = nodePartitions[0]; String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName, - nodePartition.getPartitionId()); - // format: 'storage dir name'/partition_#/dataverse/feed/adapter_# - File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator - + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition); - return StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f); + partition.getPartitionId()); + // Note: feed adapter instances in a single node share the feed logger + // format: 'storage dir name'/partition_#/dataverse/feed/node + File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName); + return StoragePathUtil.getFileSplitForClusterPartition(partition, f); } public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, @@ -63,22 +62,11 @@ if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { throw new AsterixException("Can't create file splits for adapter with count partitioning constraints"); } - File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); - String[] locations = null; - locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); + String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); List<FileSplit> splits = new ArrayList<FileSplit>(); - String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); - int i = 0; for (String nd : locations) { - // Always get the first partition - ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]; - String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName, - nodePartition.getPartitionId()); - // format: 'storage dir name'/partition_#/dataverse/feed/adapter_# - File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator - + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i); - splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f)); - i++; + splits.add(splitsForAdapter(dataverseName, feedName, nd, + AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0])); } return splits.toArray(new FileSplit[] {}); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java index 4eec348..b15d097 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java @@ -33,8 +33,9 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; -import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -48,44 +49,51 @@ private Iterator<File> it; private final String expression; private FeedLogManager logManager; - private final Path path; + private final List<Path> paths; private final boolean isFeed; private boolean done; - private File current; - private AbstractFeedDataFlowController controller; private final LinkedList<Path> dirs; + private final ReentrantLock lock = new ReentrantLock(); - public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) { + public FileSystemWatcher(List<Path> inputResources, String expression, boolean isFeed) throws HyracksDataException { + this.isFeed = isFeed; this.keys = isFeed ? new HashMap<WatchKey, Path>() : null; this.expression = expression; - this.path = inputResource; - this.isFeed = isFeed; + this.paths = inputResources; this.dirs = new LinkedList<Path>(); + if (!isFeed) { + init(); + } } - public void setFeedLogManager(FeedLogManager feedLogManager) { - this.logManager = feedLogManager; + public synchronized void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException { + if (logManager == null) { + this.logManager = feedLogManager; + init(); + } } - public void init() throws HyracksDataException { + public synchronized void init() throws HyracksDataException { try { dirs.clear(); - LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs); - it = files.iterator(); - if (isFeed) { - keys.clear(); - if (watcher != null) { - try { - watcher.close(); - } catch (IOException e) { - LOGGER.warn("Failed to close watcher service", e); + for (Path path : paths) { + LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs); + it = files.iterator(); + if (isFeed) { + keys.clear(); + if (watcher != null) { + try { + watcher.close(); + } catch (IOException e) { + LOGGER.warn("Failed to close watcher service", e); + } } + watcher = FileSystems.getDefault().newWatchService(); + for (Path dirPath : dirs) { + register(dirPath); + } + resume(); } - watcher = FileSystems.getDefault().newWatchService(); - for (Path path : dirs) { - register(path); - } - resume(); } } catch (IOException e) { throw new HyracksDataException(e); @@ -102,7 +110,7 @@ keys.put(key, dir); } - private void resume() throws IOException { + private synchronized void resume() throws IOException { if (logManager == null) { return; } @@ -142,14 +150,12 @@ } for (WatchEvent<?> event : key.pollEvents()) { Kind<?> kind = event.kind(); - // TODO: Do something about overflow events // An overflow event means that some events were dropped if (kind == StandardWatchEventKinds.OVERFLOW) { if (LOGGER.isEnabledFor(Level.WARN)) { LOGGER.warn("Overflow event. Some events might have been missed"); } // need to read and validate all files. - //TODO: use btrees for all logs init(); return; } @@ -174,33 +180,90 @@ } } } + it = files.iterator(); } - public void close() throws IOException { + public synchronized void close() throws IOException { if (!done) { if (watcher != null) { watcher.close(); watcher = null; } - if (logManager != null) { - if (current != null) { - logManager.startPartition(current.getAbsolutePath()); - logManager.endPartition(); - } - logManager.close(); - current = null; - } done = true; } } - public File next() throws IOException { - if ((current != null) && (logManager != null)) { - logManager.startPartition(current.getAbsolutePath()); - logManager.endPartition(); + // poll is not blocking + public synchronized File poll() throws IOException { + if (it.hasNext()) { + return it.next(); } - current = it.next(); - return current; + if (done || !isFeed) { + return null; + } + files.clear(); + it = files.iterator(); + if (keys.isEmpty()) { + close(); + return null; + } + // Read new Events (Polling first to add all available files) + WatchKey key; + key = watcher.poll(); + while (key != null) { + handleEvents(key); + if (endOfEvents(key)) { + close(); + return null; + } + key = watcher.poll(); + } + return null; + } + + // take is blocking + public synchronized File take() throws IOException { + File next = poll(); + if (next != null) { + return next; + } + if (done || !isFeed) { + return null; + } + // No file was found, wait for the filesystem to push events + WatchKey key = null; + lock.lock(); + try { + while (!it.hasNext()) { + try { + key = watcher.take(); + } catch (InterruptedException x) { + if (LOGGER.isEnabledFor(Level.WARN)) { + LOGGER.warn("Feed Closed"); + } + if (watcher == null) { + return null; + } + continue; + } catch (ClosedWatchServiceException e) { + if (LOGGER.isEnabledFor(Level.WARN)) { + LOGGER.warn("The watcher has exited"); + } + if (watcher == null) { + return null; + } + continue; + } + handleEvents(key); + if (endOfEvents(key)) { + return null; + } + } + } finally { + lock.unlock(); + } + // files were found, re-create the iterator and move it one step + return it.next(); } private boolean endOfEvents(WatchKey key) { @@ -212,65 +275,5 @@ } } return false; - } - - public boolean hasNext() throws IOException { - if (it.hasNext()) { - return true; - } - if (done || !isFeed) { - return false; - } - files.clear(); - if (keys.isEmpty()) { - return false; - } - // Read new Events (Polling first to add all available files) - WatchKey key; - key = watcher.poll(); - while (key != null) { - handleEvents(key); - if (endOfEvents(key)) { - close(); - return false; - } - key = watcher.poll(); - } - // No file was found, wait for the filesystem to push events - if (controller != null) { - controller.flush(); - } - while (files.isEmpty()) { - try { - key = watcher.take(); - } catch (InterruptedException x) { - if (LOGGER.isEnabledFor(Level.WARN)) { - LOGGER.warn("Feed Closed"); - } - if (watcher == null) { - return false; - } - continue; - } catch (ClosedWatchServiceException e) { - if (LOGGER.isEnabledFor(Level.WARN)) { - LOGGER.warn("The watcher has exited"); - } - if (watcher == null) { - return false; - } - continue; - } - handleEvents(key); - if (endOfEvents(key)) { - return false; - } - } - // files were found, re-create the iterator and move it one step - it = files.iterator(); - return it.hasNext(); - } - - public void setController(AbstractFeedDataFlowController controller) { - this.controller = controller; } } diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java index 354aedb..876639d 100644 --- a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java @@ -18,7 +18,10 @@ */ package org.apache.asterix.external.classad.test; +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -32,8 +35,7 @@ import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader; import org.apache.asterix.external.input.stream.LocalFSInputStream; import org.apache.asterix.external.library.ClassAdParser; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.asterix.external.util.FileSystemWatcher; import junit.framework.Test; import junit.framework.TestCase; @@ -69,10 +71,10 @@ ClassAdParser parser = new ClassAdParser(objectPool); CharArrayLexerSource lexerSource = new CharArrayLexerSource(); for (String path : files) { - LocalFSInputStream in = new LocalFSInputStream( - new FileSplit[] { new FileSplit("", - new FileReference(Paths.get(getClass().getResource(path).toURI()).toFile())) }, - null, null, 0, null, false); + List<Path> paths = new ArrayList<>(); + paths.add(Paths.get(getClass().getResource(path).toURI())); + FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); + LocalFSInputStream in = new LocalFSInputStream(watcher); SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]"); Value val = new Value(objectPool); while (recordReader.hasNext()) { diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index 7da6389..d1a4532 100644 --- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -80,16 +80,16 @@ IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext() .getApplicationObject(); - ClusterPartition[] nodePartitions = propertiesProvider.getMetadataProperties().getNodePartitions() - .get(nodeId); + ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions() + .get(nodeId)[0]; try { parser = new ADMDataParser(outputType, true); forwarder = DataflowUtils .getTupleForwarder(configuration, FeedUtils.getFeedLogManager(ctx, FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration), - ExternalDataUtils.getFeedName(configuration), partition, - nodePartitions))); + ExternalDataUtils.getFeedName(configuration), nodeId, + nodePartition))); tb = new ArrayTupleBuilder(1); } catch (Exception e) { throw new HyracksDataException(e); diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java index fc6e725..faff9df 100644 --- a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.PrintStream; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.asterix.external.parser.ADMDataParser; import org.apache.asterix.external.parser.RecordWithMetadataParser; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.FileSystemWatcher; import org.apache.asterix.formats.nontagged.AqlADMPrinterFactoryProvider; import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; import org.apache.asterix.om.types.ARecordType; @@ -42,9 +44,7 @@ import org.apache.hyracks.algebricks.data.IPrinterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.std.file.FileSplit; import org.junit.Assert; import org.junit.Test; @@ -79,12 +79,11 @@ int[] pkIndexes = { 0 }; int[] pkIndicators = { 1 }; + List<Path> paths = new ArrayList<>(); + paths.add(Paths.get(getClass().getResource("/beer.csv").toURI())); + FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); // create input stream - LocalFSInputStream inputStream = new LocalFSInputStream( - new FileSplit[] { new FileSplit(null, - new FileReference(Paths.get(getClass().getResource("/beer.csv").toURI()).toFile())) }, - null, null, 0, null, false); - + LocalFSInputStream inputStream = new LocalFSInputStream(watcher); // create reader record reader QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream, ExternalDataConstants.DEFAULT_QUOTE); diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java index 51c3802..0e9aa0c 100644 --- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java +++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Set; +import org.apache.hyracks.control.cc.ClusterControllerService; + /** * Utility class for obtaining information on the set of Hyracks NodeController * processes that are running on a given host. @@ -54,6 +56,8 @@ } public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception { - AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map); + ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance() + .getCCApplicationContext().getControllerService(); + map.putAll(ccs.getIpAddressNodeNameMap()); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/776 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ib9d5ece656220d5f562cc385f882c5ddfd3283a6 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
