This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new d9764ab1dc [NO ISSUE][OTH] Additional external indexing clean up d9764ab1dc is described below commit d9764ab1dc0d3f02e29aef257f5506c7e0afb297 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Sat Mar 25 20:03:10 2023 +0300 [NO ISSUE][OTH] Additional external indexing clean up - user model changes: yes - storage format changes: no - interface changes: yes Details: - Remove REFRESH statement. - More external indexing runtime removal. Change-Id: Ide588c3933979edae763b810dfa6f8a34116945f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17449 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Al Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../asterix/app/translator/QueryTranslator.java | 88 ------------- .../adapter/factory/GenericAdapterFactory.java | 22 +--- .../external/api/IExternalDataSourceFactory.java | 9 -- .../asterix/external/api/IExternalIndexer.java | 56 -------- .../external/api/IIndexibleExternalDataSource.java | 34 ----- .../external/api/IIndexingAdapterFactory.java | 27 ---- .../asterix/external/api/IIndexingDatasource.java | 50 -------- .../dataflow/IndexingDataFlowController.java | 47 ------- .../external/input/HDFSDataSourceFactory.java | 69 +++------- .../record/reader/IndexingStreamRecordReader.java | 101 --------------- .../AbstractExternalInputStreamFactory.java | 5 - .../input/record/reader/hdfs/HDFSRecordReader.java | 53 +------- .../record/reader/rss/RSSRecordReaderFactory.java | 5 - .../reader/twitter/TwitterRecordReaderFactory.java | 5 - .../external/input/stream/HDFSInputStream.java | 64 +--------- .../stream/factory/LocalFSInputStreamFactory.java | 5 - .../factory/SocketServerInputStreamFactory.java | 5 - .../factory/TwitterFirehoseStreamFactory.java | 5 - .../provider/DataflowControllerProvider.java | 9 +- .../apache/asterix/lang/common/base/Statement.java | 1 - .../statement/RefreshExternalDatasetStatement.java | 64 ---------- .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj | 19 --- .../declared/BTreeResourceFactoryProvider.java | 6 +- .../metadata/declared/MetadataProvider.java | 27 +--- .../metadata/lock/ExternalDatasetsRegistry.java | 142 --------------------- .../apache/asterix/metadata/utils/DatasetUtil.java | 16 --- .../utils/ExternalDatasetAccessManager.java | 122 ------------------ .../apache/asterix/metadata/utils/IndexUtil.java | 3 - .../utils/SecondaryIndexOperationsHelper.java | 6 - 29 files changed, 30 insertions(+), 1035 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 399b7b4e1a..15a8238a6b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -94,7 +94,6 @@ import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; -import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; @@ -187,7 +186,6 @@ import org.apache.asterix.metadata.entities.Synonym; import org.apache.asterix.metadata.entities.ViewDetails; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil; -import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; @@ -499,9 +497,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen case COMPACT: handleCompactStatement(metadataProvider, stmt, hcc); break; - case EXTERNAL_DATASET_REFRESH: - handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc); - break; case FUNCTION_DECL: handleDeclareFunctionStatement(metadataProvider, stmt); break; @@ -1549,9 +1544,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception { ProgressState progress = ProgressState.NO_PROGRESS; boolean bActiveTxn = true; - Index filesIndex = null; - boolean firstExternalDatasetIndex = false; - boolean datasetLocked = false; MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); JobSpecification spec; try { @@ -1559,7 +1551,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (ds.getDatasetType() == DatasetType.INTERNAL) { validateDatasetState(metadataProvider, ds, sourceLoc); } else if (ds.getDatasetType() == DatasetType.EXTERNAL) { - // External dataset throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, dataset() + " using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter() + " adapter can't be indexed"); } @@ -1666,17 +1657,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen index.getDatasetName(), index.getIndexName()); index.setPendingOp(MetadataUtil.PENDING_NO_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); - // add another new files index with PendingNoOp after deleting the index with - // PendingAddOp - if (firstExternalDatasetIndex) { - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(), - index.getDatasetName(), filesIndex.getIndexName()); - filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP); - MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); - // update transaction timestamp - ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date()); - MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); - } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { if (bActiveTxn) { @@ -1701,38 +1681,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - if (firstExternalDatasetIndex) { - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - // Drop External Files from metadata - MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException( - "System is inconsistent state: pending files for(" + index.getDataverseName() + "." - + index.getDatasetName() + ") couldn't be removed from the metadata", - e); - } - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - try { - // Drop the files index from metadata - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), - index.getDataverseName(), index.getDatasetName(), - IndexingConstants.getFilesIndexName(index.getDatasetName())); - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e2) { - e.addSuppressed(e2); - abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending index(" - + index.getDataverseName() + "." + index.getDatasetName() + "." - + IndexingConstants.getFilesIndexName(index.getDatasetName()) - + ") couldn't be removed from the metadata", e); - } - } // remove the record from the metadata. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -1749,10 +1697,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } throw e; - } finally { - if (datasetLocked) { - ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex); - } } } @@ -1843,7 +1787,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen doDropDataverse(stmtDropDataverse, metadataProvider, hcc, requestParameters); } finally { metadataProvider.getLocks().unlock(); - ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -1856,7 +1799,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); List<FeedEventsListener> feedsToStop = new ArrayList<>(); - List<Dataset> externalDatasetsToDeregister = new ArrayList<>(); List<JobSpecification> jobsToExecute = new ArrayList<>(); try { Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); @@ -1906,13 +1848,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } break; case EXTERNAL: - indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); - for (Index index : indexes) { - jobsToExecute - .add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc)); - } - externalDatasetsToDeregister.add(dataset); - break; case VIEW: break; } @@ -1939,10 +1874,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen bActiveTxn = false; progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; - for (Dataset externalDataset : externalDatasetsToDeregister) { - ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(externalDataset); - } - for (FeedEventsListener feedListener : feedsToStop) { if (feedListener.getState() != ActivityState.STOPPED) { feedListener.stop(metadataProvider); @@ -2045,7 +1976,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen requestParameters, true, sourceLoc); } finally { metadataProvider.getLocks().unlock(); - ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -2154,7 +2084,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen doDropIndex(metadataProvider, stmtIndexDrop, dataverseName, datasetName, hcc, requestParameters); } finally { metadataProvider.getLocks().unlock(); - ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -2165,8 +2094,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String indexName = stmtIndexDrop.getIndexName().getValue(); ProgressState progress = ProgressState.NO_PROGRESS; List<JobSpecification> jobsToExecute = new ArrayList<>(); - // For external index - boolean dropFilesIndex = false; MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; @@ -2208,8 +2135,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. finally, delete the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); - } else { - // External dataset } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); return true; @@ -2236,10 +2161,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen try { MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, indexName); - if (dropFilesIndex) { - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, IndexingConstants.getFilesIndexName(datasetName)); - } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e2) { e.addSuppressed(e2); @@ -2648,7 +2569,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen doDropView(metadataProvider, stmtDrop, dataverseName, viewName); } finally { metadataProvider.getLocks().unlock(); - ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -4479,7 +4399,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw e; } finally { metadataProvider.getLocks().unlock(); - ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -4520,8 +4439,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen @Override public void unlock() { metadataProvider.getLocks().unlock(); - // release external datasets' locks acquired during compilation of the query - ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); compilationLock.readLock().unlock(); } }; @@ -4745,11 +4662,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleExternalDatasetRefreshStatement(MetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - // no op - } - @Override public DataverseName getActiveDataverseName(DataverseName dataverseName) { return dataverseName != null ? dataverseName : activeDataverse.getDataverseName(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 8ec5af037e..9ca87b873f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -19,7 +19,6 @@ package org.apache.asterix.external.adapter.factory; import java.util.Collections; -import java.util.List; import java.util.Map; import org.apache.asterix.common.api.INcApplicationContext; @@ -30,13 +29,10 @@ import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IDataFlowController; import org.apache.asterix.external.api.IDataParserFactory; import org.apache.asterix.external.api.IExternalDataSourceFactory; -import org.apache.asterix.external.api.IIndexibleExternalDataSource; -import org.apache.asterix.external.api.IIndexingAdapterFactory; import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.asterix.external.dataset.adapter.GenericAdapter; -import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.provider.DataflowControllerProvider; import org.apache.asterix.external.provider.DatasourceFactoryProvider; import org.apache.asterix.external.provider.ParserFactoryProvider; @@ -59,7 +55,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAdapterFactory { +public class GenericAdapterFactory implements ITypedAdapterFactory { private static final long serialVersionUID = 1L; private static final Logger LOGGER = LogManager.getLogger(); @@ -67,19 +63,11 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAda private IDataParserFactory dataParserFactory; private ARecordType recordType; private Map<String, String> configuration; - private List<ExternalFile> files; - private boolean indexingOp; private boolean isFeed; private FileSplit[] feedLogFileSplits; private ARecordType metaType; private transient FeedLogManager feedLogManager; - @Override - public void setSnapshot(List<ExternalFile> files, boolean indexingOp) { - this.files = files; - this.indexingOp = indexingOp; - } - @Override public String getAlias() { return ExternalDataConstants.ALIAS_GENERIC_ADAPTER; @@ -111,7 +99,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAda feedLogManager.touch(); } IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition, - dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager); + dataSourceFactory, dataParserFactory, configuration, isFeed, feedLogManager); if (isFeed) { return new FeedAdapter((AbstractFeedDataFlowController) controller); } else { @@ -124,9 +112,6 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAda if (dataSourceFactory == null) { dataSourceFactory = createExternalDataSourceFactory(configuration); // create and configure parser factory - if (dataSourceFactory.isIndexible() && (files != null)) { - ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp); - } dataSourceFactory.configure(serviceContext, configuration, warningCollector); } if (dataParserFactory == null) { @@ -145,9 +130,6 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAda ICcApplicationContext appCtx = (ICcApplicationContext) serviceContext.getApplicationContext(); ExternalDataUtils.validateDataSourceParameters(configuration); dataSourceFactory = createExternalDataSourceFactory(configuration); - if (dataSourceFactory.isIndexible() && (files != null)) { - ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp); - } dataSourceFactory.configure(serviceContext, configuration, warningCollector); ExternalDataUtils.validateDataParserParameters(configuration); dataParserFactory = createDataParserFactory(configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java index 17cbff446e..e5c4b3fcc4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java @@ -67,15 +67,6 @@ public interface IExternalDataSourceFactory extends Serializable { void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException; - /** - * Specify whether the external data source can be indexed - * - * @return - */ - default boolean isIndexible() { - return false; - } - /** * returns the passed partition constraints if not null, otherwise returns round robin absolute partition * constraints that matches the count. diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java deleted file mode 100644 index c261ae3485..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.api; - -import java.io.IOException; -import java.io.Serializable; - -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; - -/** - * This Interface represents the component responsible for adding record IDs to tuples when indexing external data - */ -public interface IExternalIndexer extends Serializable { - - /** - * This method is called by an indexible datasource when the external source reader have been updated. - * this gives a chance for the indexer to update its reader specific values (i,e. file name) - * - * @param reader - * the new reader - * @throws Exception - */ - public void reset(IIndexingDatasource reader) throws IOException; - - /** - * This method is called by the dataflow controller with each tuple. the indexer is expected to append record ids to the tuple. - * - * @param tb - * @throws Exception - */ - public void index(ArrayTupleBuilder tb) throws IOException; - - /** - * This method returns the number of fields in the record id. It is used by tuple appender at the initialization step. - * - * @return - * @throws Exception - */ - public int getNumberOfFields() throws IOException; -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java deleted file mode 100644 index accd730d07..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.api; - -import java.util.List; - -import org.apache.asterix.external.indexing.ExternalFile; - -public interface IIndexibleExternalDataSource extends IExternalDataSourceFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); - - /** - * Specify whether the external data source is configured for indexing - * - * @return - */ - public boolean isIndexingOp(); -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java deleted file mode 100644 index 8d420465ef..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.api; - -import java.util.List; - -import org.apache.asterix.external.indexing.ExternalFile; - -public interface IIndexingAdapterFactory extends ITypedAdapterFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java deleted file mode 100644 index 5381ef7d2b..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.api; - -import java.util.List; - -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.RecordReader; - -/** - * An interface for external data sources which support indexing - */ -public interface IIndexingDatasource { - /** - * @return an external indexer that is used to write RID fields for each record - */ - public IExternalIndexer getIndexer(); - - /** - * @return a list of external files being accessed - */ - public List<ExternalFile> getSnapshot(); - - /** - * @return the index of the currently being read file - */ - public int getCurrentSplitIndex(); - - /** - * @return an HDFS record reader that is used to get the current position in the file - */ - public RecordReader<?, ? extends Writable> getReader(); -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java deleted file mode 100644 index b956295539..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.dataflow; - -import java.io.IOException; - -import org.apache.asterix.external.api.IExternalIndexer; -import org.apache.asterix.external.api.IRecordDataParser; -import org.apache.asterix.external.api.IRecordReader; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; - -public class IndexingDataFlowController<T> extends RecordDataFlowController<T> { - private final IExternalIndexer indexer; - - public IndexingDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser, - IRecordReader<? extends T> recordReader, IExternalIndexer indexer) throws IOException { - super(ctx, dataParser, recordReader, 1 + indexer.getNumberOfFields()); - this.indexer = indexer; - } - - @Override - protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws HyracksDataException { - try { - indexer.index(tb); - } catch (IOException e) { - throw HyracksDataException.create(e); - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 596edb2801..f22d12887f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -32,12 +32,9 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.AsterixInputStream; -import org.apache.asterix.external.api.IExternalIndexer; -import org.apache.asterix.external.api.IIndexibleExternalDataSource; +import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader; import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader; import org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader; import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; @@ -62,7 +59,7 @@ import org.apache.hyracks.hdfs.dataflow.ConfFactory; import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory; import org.apache.hyracks.hdfs.scheduler.Scheduler; -public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IIndexibleExternalDataSource { +public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IExternalDataSourceFactory { private static final long serialVersionUID = 1L; private static final List<String> recordReaderNames = Collections.singletonList("hdfs"); @@ -77,7 +74,6 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd protected static Scheduler hdfsScheduler; protected static Boolean initialized = false; protected static Object initLock = new Object(); - protected List<ExternalFile> files; protected Map<String, String> configuration; protected Class<?> recordClass; protected boolean indexingOp = false; @@ -107,19 +103,14 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd confFactory = new ConfFactory(conf); clusterLocations = getPartitionConstraint(); int numPartitions = clusterLocations.getLocations().length; - // if files list was set, we restrict the splits to the list - InputSplit[] inputSplits; - if (files == null) { - inputSplits = getInputSplits(conf, numPartitions); - } else { - inputSplits = HDFSUtils.getSplits(conf, files); - } - readSchedule = hdfsScheduler.getLocationConstraints(inputSplits); - inputSplitsFactory = new InputSplitsFactory(inputSplits); + InputSplit[] configInputSplits = getInputSplits(conf, numPartitions); + readSchedule = hdfsScheduler.getLocationConstraints(configInputSplits); + inputSplitsFactory = new InputSplitsFactory(configInputSplits); read = new boolean[readSchedule.length]; Arrays.fill(read, false); if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) { - RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL); + RecordReader<?, ?> reader = + conf.getInputFormat().getRecordReader(configInputSplits[0], conf, Reporter.NULL); this.recordClass = reader.createValue().getClass(); reader.close(); } else if (formatString.equals(ExternalDataConstants.FORMAT_PARQUET)) { @@ -152,24 +143,15 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd return conf.getInputFormat().getSplits(conf, numPartitions); } - // Used to tell the factory to restrict the splits to the intersection between this list a - // actual files on hde - @Override - public void setSnapshot(List<ExternalFile> files, boolean indexingOp) { - this.files = files; - this.indexingOp = indexingOp; - } - /* * The method below was modified to take care of the following * 1. when target files are not null, it generates a file aware input stream that validate * against the files * 2. if the data is binary, it returns a generic reader */ - public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition, IExternalIndexer indexer) - throws HyracksDataException { + public AsterixInputStream createInputStream(IHyracksTaskContext ctx) throws HyracksDataException { try { restoreConfig(ctx); - return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer); + return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -213,10 +195,6 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd } } - public JobConf getJobConf() throws HyracksDataException { - return confFactory.getConf(); - } - @Override public DataSourceType getDataSourceType() { return ExternalDataUtils.getDataSourceType(configuration); @@ -235,15 +213,10 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { try { - IExternalIndexer indexer = null; if (recordReaderClazz != null) { StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); - streamReader.configure(ctx, createInputStream(ctx, partition, indexer), configuration); - if (indexer != null) { - return new IndexingStreamRecordReader(streamReader, indexer); - } else { - return streamReader; - } + streamReader.configure(ctx, createInputStream(ctx), configuration); + return streamReader; } restoreConfig(ctx); JobConf readerConf = conf; @@ -257,8 +230,8 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd */ readerConf = confFactory.getConf(); } - return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf, files, - indexer, ctx.getWarningCollector()); + return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf, + ctx.getWarningCollector()); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -269,29 +242,19 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd return recordClass; } - @Override - public boolean isIndexible() { - return true; - } - - @Override - public boolean isIndexingOp() { - return ((files != null) && indexingOp); - } - @Override public List<String> getRecordReaderNames() { return recordReaderNames; } private static IRecordReader<? extends Object> createRecordReader(Map<String, String> configuration, boolean[] read, - InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf, List<ExternalFile> files, - IExternalIndexer indexer, IWarningCollector warningCollector) throws IOException { + InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf, + IWarningCollector warningCollector) { if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT.trim()) .equals(ExternalDataConstants.INPUT_FORMAT_PARQUET)) { return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, warningCollector); } else { - return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, files, indexer); + return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf); } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java deleted file mode 100644 index 6eee8920b9..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader; - -import java.io.IOException; -import java.util.List; - -import org.apache.asterix.external.api.IExternalIndexer; -import org.apache.asterix.external.api.IIndexingDatasource; -import org.apache.asterix.external.api.IRawRecord; -import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; -import org.apache.asterix.external.util.FeedLogManager; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class IndexingStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource { - - private StreamRecordReader reader; - private IExternalIndexer indexer; - - public IndexingStreamRecordReader(StreamRecordReader reader, IExternalIndexer indexer) { - this.reader = reader; - this.indexer = indexer; - } - - @Override - public void close() throws IOException { - reader.close(); - } - - @Override - public IExternalIndexer getIndexer() { - return indexer; - } - - @Override - public boolean hasNext() throws Exception { - return reader.hasNext(); - } - - @Override - public IRawRecord<char[]> next() throws IOException, InterruptedException { - return reader.next(); - } - - @Override - public boolean stop() { - return reader.stop(); - } - - @Override - public void setController(AbstractFeedDataFlowController controller) { - reader.setController(controller); - } - - @Override - public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException { - reader.setFeedLogManager(feedLogManager); - } - - @Override - public List<ExternalFile> getSnapshot() { - return null; - } - - @Override - public int getCurrentSplitIndex() { - return -1; - } - - @Override - public RecordReader<?, ? extends Writable> getReader() { - return null; - } - - @Override - public boolean handleException(Throwable th) { - return reader.handleException(th); - } - -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java index eac4835ff7..7313b31078 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java @@ -48,11 +48,6 @@ public abstract class AbstractExternalInputStreamFactory implements IInputStream return DataSourceType.STREAM; } - @Override - public boolean isIndexible() { - return false; - } - @Override public abstract AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java index 9fbc8007b2..48b99d1be6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java @@ -19,35 +19,18 @@ package org.apache.asterix.external.input.record.reader.hdfs; import java.io.IOException; -import java.util.List; -import org.apache.asterix.external.api.IExternalIndexer; -import org.apache.asterix.external.api.IIndexingDatasource; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hyracks.api.exceptions.HyracksDataException; -public class HDFSRecordReader<K, V extends Writable> extends AbstractHDFSRecordReader<K, V> - implements IIndexingDatasource { - // Indexing variables - private final IExternalIndexer indexer; - private final List<ExternalFile> snapshot; - private final FileSystem hdfs; +public class HDFSRecordReader<K, V extends Writable> extends AbstractHDFSRecordReader<K, V> { public HDFSRecordReader(boolean[] read, InputSplit[] inputSplits, String[] readSchedule, String nodeName, - JobConf conf, List<ExternalFile> snapshot, IExternalIndexer indexer) throws IOException { + JobConf conf) { super(read, inputSplits, readSchedule, nodeName, conf); - this.indexer = indexer; - this.snapshot = snapshot; - this.hdfs = FileSystem.get(conf); } @SuppressWarnings("unchecked") @@ -58,26 +41,11 @@ public class HDFSRecordReader<K, V extends Writable> extends AbstractHDFSRecordR key = reader.createKey(); value = reader.createValue(); } - if (indexer != null) { - try { - indexer.reset(this); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } return reader; } @Override - protected boolean onNextInputSplit() throws IOException { - if (snapshot != null) { - String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath(); - FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName)); - // Skip if not the same file stored in the files snapshot - if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime().getTime()) { - return true; - } - } + protected boolean onNextInputSplit() { return false; } @@ -86,21 +54,6 @@ public class HDFSRecordReader<K, V extends Writable> extends AbstractHDFSRecordR return false; } - @Override - public IExternalIndexer getIndexer() { - return indexer; - } - - @Override - public List<ExternalFile> getSnapshot() { - return snapshot; - } - - @Override - public int getCurrentSplitIndex() { - return currentSplitIndex; - } - @Override public RecordReader<K, V> getReader() { return reader; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java index b52636a31d..1a5d2a25d9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java @@ -84,11 +84,6 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntry> { return recordReaderNames; } - @Override - public boolean isIndexible() { - return false; - } - @Override public IRecordReader<? extends SyndEntry> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 0fd0b95247..52d8c03ed5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -124,11 +124,6 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<char[]> } } - @Override - public boolean isIndexible() { - return false; - } - @Override public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java index 4cbfaa3026..46c21021fa 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java @@ -19,30 +19,20 @@ package org.apache.asterix.external.input.stream; import java.io.IOException; -import java.util.List; import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.api.AsterixInputStream; -import org.apache.asterix.external.api.IExternalIndexer; -import org.apache.asterix.external.api.IIndexingDatasource; -import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hyracks.api.exceptions.HyracksDataException; -public class HDFSInputStream extends AsterixInputStream implements IIndexingDatasource { +public class HDFSInputStream extends AsterixInputStream { private RecordReader<Object, Text> reader; private Text value = null; @@ -54,16 +44,11 @@ public class HDFSInputStream extends AsterixInputStream implements IIndexingData private String[] readSchedule; private String nodeName; private JobConf conf; - // Indexing variables - private final IExternalIndexer indexer; - private final List<ExternalFile> snapshot; - private final FileSystem hdfs; private int pos = 0; @SuppressWarnings("unchecked") public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName, - JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot, IExternalIndexer indexer) - throws IOException, AsterixException { + JobConf conf, Map<String, String> configuration) throws IOException, AsterixException { this.read = read; this.inputSplits = inputSplits; this.readSchedule = readSchedule; @@ -71,16 +56,8 @@ public class HDFSInputStream extends AsterixInputStream implements IIndexingData this.conf = conf; this.inputFormat = conf.getInputFormat(); this.reader = new EmptyRecordReader<Object, Text>(); - this.snapshot = snapshot; - this.hdfs = FileSystem.get(conf); - this.indexer = indexer; nextInputSplit(); this.value = new Text(); - if (snapshot != null) { - if (currentSplitIndex < snapshot.size()) { - indexer.reset(this); - } - } } @Override @@ -177,16 +154,6 @@ public class HDFSInputStream extends AsterixInputStream implements IIndexingData continue; } } - if (snapshot != null) { - String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath(); - FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName)); - // Skip if not the same file stored in the files snapshot - if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime() - .getTime()) { - continue; - } - } - reader.close(); reader = getRecordReader(currentSplitIndex); return true; @@ -202,33 +169,6 @@ public class HDFSInputStream extends AsterixInputStream implements IIndexingData key = reader.createKey(); value = reader.createValue(); } - if (indexer != null) { - try { - indexer.reset(this); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - return reader; - } - - @Override - public IExternalIndexer getIndexer() { - return indexer; - } - - @Override - public List<ExternalFile> getSnapshot() { - return snapshot; - } - - @Override - public int getCurrentSplitIndex() { - return currentSplitIndex; - } - - @Override - public RecordReader<?, ? extends Writable> getReader() { return reader; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java index 58ef2a43be..cde0266a38 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java @@ -85,11 +85,6 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory { return DataSourceType.STREAM; } - @Override - public boolean isIndexible() { - return false; - } - @Override public void configure(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector) throws AsterixException { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java index 1bd08d94ae..d628062401 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java @@ -83,9 +83,4 @@ public class SocketServerInputStreamFactory implements IInputStreamFactory { public DataSourceType getDataSourceType() { return DataSourceType.STREAM; } - - @Override - public boolean isIndexible() { - return false; - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java index 2b0bb55c27..ab7fa777f6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java @@ -91,11 +91,6 @@ public class TwitterFirehoseStreamFactory implements IInputStreamFactory { this.configuration = configuration; } - @Override - public boolean isIndexible() { - return false; - } - @Override public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { try { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index f60ecdcc62..c5315094c9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -28,7 +28,6 @@ import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IDataFlowController; import org.apache.asterix.external.api.IDataParserFactory; import org.apache.asterix.external.api.IExternalDataSourceFactory; -import org.apache.asterix.external.api.IIndexingDatasource; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordDataParserFactory; @@ -43,7 +42,6 @@ import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController import org.apache.asterix.external.dataflow.FeedRecordDataFlowController; import org.apache.asterix.external.dataflow.FeedStreamDataFlowController; import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController; -import org.apache.asterix.external.dataflow.IndexingDataFlowController; import org.apache.asterix.external.dataflow.RecordDataFlowController; import org.apache.asterix.external.dataflow.StreamDataFlowController; import org.apache.asterix.external.util.ExternalDataUtils; @@ -61,7 +59,7 @@ public class DataflowControllerProvider { @SuppressWarnings({ "rawtypes", "unchecked" }) public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx, int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory, - Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager) + Map<String, String> configuration, boolean isFeed, FeedLogManager feedLogManager) throws HyracksDataException { try { switch (dataSourceFactory.getDataSourceType()) { @@ -72,10 +70,7 @@ public class DataflowControllerProvider { IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx); // TODO(ali): revisit to think about passing data source name via setter or via createRecordParser dataParser.configure(recordReader.getDataSourceName(), recordReader.getLineNumber()); - if (indexingOp) { - return new IndexingDataFlowController(ctx, dataParser, recordReader, - ((IIndexingDatasource) recordReader).getIndexer()); - } else if (isFeed) { + if (isFeed) { boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration); boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration); if (isRecordWithMeta) { diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java index 31a9c51007..05d53b17ce 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java @@ -109,7 +109,6 @@ public interface Statement extends ILangExpression { ANALYZE, ANALYZE_DROP, COMPACT, - EXTERNAL_DATASET_REFRESH, SUBSCRIBE_FEED, EXTENSION, } diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/RefreshExternalDatasetStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/RefreshExternalDatasetStatement.java deleted file mode 100644 index 30a02fb5a4..0000000000 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/RefreshExternalDatasetStatement.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.lang.common.statement; - -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.metadata.DataverseName; -import org.apache.asterix.lang.common.base.AbstractStatement; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.lang.common.visitor.base.ILangVisitor; - -public class RefreshExternalDatasetStatement extends AbstractStatement { - - private DataverseName dataverseName; - private Identifier datasetName; - - public Identifier getDatasetName() { - return datasetName; - } - - public void setDatasetName(Identifier datasetName) { - this.datasetName = datasetName; - } - - public DataverseName getDataverseName() { - return dataverseName; - } - - public void setDataverseName(DataverseName dataverseName) { - this.dataverseName = dataverseName; - } - - @Override - public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { - return null; - } - - @Override - public Kind getKind() { - return Statement.Kind.EXTERNAL_DATASET_REFRESH; - } - - @Override - public byte getCategory() { - return Category.UPDATE; - } - -} diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index 1c4c9773a4..5ae1eb5aa8 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -162,7 +162,6 @@ import org.apache.asterix.lang.common.statement.LoadStatement; import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; import org.apache.asterix.lang.common.statement.NodegroupDecl; import org.apache.asterix.lang.common.statement.Query; -import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement; import org.apache.asterix.lang.common.statement.SetStatement; import org.apache.asterix.lang.common.statement.SynonymDropStatement; import org.apache.asterix.lang.common.statement.TypeDecl; @@ -939,7 +938,6 @@ Statement SingleStatement() throws ParseException: | stmt = CompactStatement() | stmt = AnalyzeStatement() | stmt = Query() - | stmt = RefreshExternalDatasetStatement() ) { return stmt; @@ -1264,22 +1262,6 @@ void DatasetRecordField(RecordTypeDefinition recType) throws ParseException: } } -RefreshExternalDatasetStatement RefreshExternalDatasetStatement() throws ParseException: -{ - Token startToken = null; - Pair<DataverseName,Identifier> nameComponents = null; - String datasetName = null; -} -{ - <REFRESH> { startToken = token; } <EXTERNAL> Dataset() nameComponents = QualifiedName() - { - RefreshExternalDatasetStatement stmt = new RefreshExternalDatasetStatement(); - stmt.setDataverseName(nameComponents.first); - stmt.setDatasetName(nameComponents.second); - return addSourceLocation(stmt, startToken); - } -} - CreateIndexStatement CreateIndexStatement(Token startStmtToken) throws ParseException: { CreateIndexStatement stmt = null; @@ -5638,7 +5620,6 @@ TOKEN [IGNORE_CASE]: | <PRESORTED : "pre-sorted"> | <PRIMARY : "primary"> | <RAW : "raw"> - | <REFRESH : "refresh"> | <RETURN : "return"> | <RETURNING : "returning"> | <RIGHT : "right"> diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java index 91edc4c63c..88e96e73ab 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java @@ -142,7 +142,7 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider { return primaryTypeTraits; } else if (dataset.getDatasetType() == DatasetType.EXTERNAL && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { - return null; + return new ITypeTraits[0]; } Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails(); int numPrimaryKeys = dataset.getPrimaryKeys().size(); @@ -175,7 +175,7 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider { return dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType); } else if (dataset.getDatasetType() == DatasetType.EXTERNAL && index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { - return null; + return new IBinaryComparatorFactory[0]; } Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails(); int numPrimaryKeys = dataset.getPrimaryKeys().size(); @@ -210,7 +210,7 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider { if (dataset.getDatasetType() == DatasetType.EXTERNAL && index.getIndexType() != DatasetConfig.IndexType.SAMPLE) { if (index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) { - return null; + return new int[0]; } else { Index.ValueIndexDetails indexDetails = ((Index.ValueIndexDetails) index.getIndexDetails()); return new int[] { indexDetails.getKeyFieldNames().size() }; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 409f0e73dc..550864485e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -36,7 +35,6 @@ import java.util.stream.Collectors; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.context.IStorageComponentProvider; @@ -59,8 +57,6 @@ import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUt import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory; import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; import org.apache.asterix.external.provider.AdapterFactoryProvider; @@ -893,27 +889,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> try { configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE, dataset.getDataverseName().getCanonicalForm()); - ITypedAdapterFactory adapterFactory = - AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName, - configuration, itemType, metaType, warningCollector); - - // check to see if dataset is indexed - Index filesIndex = - MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), - dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); - - if (filesIndex != null && filesIndex.getPendingOp() == 0) { - // get files - List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); - Iterator<ExternalFile> iterator = files.iterator(); - while (iterator.hasNext()) { - if (iterator.next().getPendingOp() != ExternalFilePendingOp.NO_OP) { - iterator.remove(); - } - } - } - - return adapterFactory; + return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName, + configuration, itemType, metaType, warningCollector); } catch (Exception e) { throw new AlgebricksException("Unable to create adapter", e); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java deleted file mode 100644 index 4d8bacf15c..0000000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.lock; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.utils.ExternalDatasetAccessManager; - -/** - * This is a singelton class used to maintain the version of each external dataset with indexes - * It should be consolidated once a better global dataset lock management is introduced. - * - * @author alamouda - */ -public class ExternalDatasetsRegistry { - public static final ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry(); - private final ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister; - - private ExternalDatasetsRegistry() { - globalRegister = new ConcurrentHashMap<>(); - } - - /** - * Get the current version of the dataset - * - * @param dataset - * @return - */ - public int getDatasetVersion(Dataset dataset) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); - if (datasetAccessMgr == null) { - globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager()); - datasetAccessMgr = globalRegister.get(key); - } - return datasetAccessMgr.getVersion(); - } - - public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider metadataProvider) { - - Map<String, Integer> locks; - String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName(); - // check first if the lock was aquired already - locks = metadataProvider.getExternalDataLocks(); - if (locks == null) { - locks = new HashMap<>(); - metadataProvider.setExternalDataLocks(locks); - } else { - // if dataset was accessed already by this job, return the registered version - Integer version = locks.get(lockKey); - if (version != null) { - return version; - } - } - - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(lockKey); - if (datasetAccessMgr == null) { - globalRegister.putIfAbsent(lockKey, new ExternalDatasetAccessManager()); - datasetAccessMgr = globalRegister.get(lockKey); - } - - // aquire the correct lock - int version = datasetAccessMgr.queryBegin(); - locks.put(lockKey, version); - return version; - } - - public void refreshBegin(Dataset dataset) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); - if (datasetAccessMgr == null) { - datasetAccessMgr = globalRegister.put(key, new ExternalDatasetAccessManager()); - } - // aquire the correct lock - datasetAccessMgr.refreshBegin(); - } - - public void removeDatasetInfo(Dataset dataset) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - globalRegister.remove(key); - } - - public void refreshEnd(Dataset dataset, boolean success) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - globalRegister.get(key).refreshEnd(success); - } - - public void buildIndexBegin(Dataset dataset, boolean firstIndex) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); - if (datasetAccessMgr == null) { - globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager()); - datasetAccessMgr = globalRegister.get(key); - } - // aquire the correct lock - datasetAccessMgr.buildIndexBegin(firstIndex); - } - - public void buildIndexEnd(Dataset dataset, boolean firstIndex) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - globalRegister.get(key).buildIndexEnd(firstIndex); - } - - public void releaseAcquiredLocks(MetadataProvider metadataProvider) { - Map<String, Integer> locks = metadataProvider.getExternalDataLocks(); - if (locks == null) { - return; - } else { - // if dataset was accessed already by this job, return the registered version - Set<Entry<String, Integer>> aquiredLocks = locks.entrySet(); - for (Entry<String, Integer> entry : aquiredLocks) { - ExternalDatasetAccessManager accessManager = globalRegister.get(entry.getKey()); - if (accessManager != null) { - accessManager.queryEnd(entry.getValue()); - } - } - locks.clear(); - } - } -} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 083b9641a6..79bf3e8454 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -43,7 +43,6 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; @@ -319,21 +318,6 @@ public class DatasetUtil { return specPrimary; } - public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider metadataProvider, Dataset dataset) - throws AlgebricksException { - String indexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName()); - JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, indexName); - IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); - IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, indexHelperFactory); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, - splitsAndConstraint.second); - spec.addRoot(btreeDrop); - return spec; - } - public static JobSpecification createDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider) throws AlgebricksException { Index index = IndexUtil.getPrimaryIndex(dataset); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetAccessManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetAccessManager.java deleted file mode 100644 index b94acf3988..0000000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetAccessManager.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.utils; - -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class ExternalDatasetAccessManager { - // a version to indicate the current version of the dataset - private int version; - // a lock to allow concurrent build index operation and serialize refresh operations - private ReentrantReadWriteLock datasetLock; - // a lock per version of the dataset to keep a version alive while queries are still assigned to it - private ReentrantReadWriteLock v0Lock; - private ReentrantReadWriteLock v1Lock; - - public ExternalDatasetAccessManager() { - this.version = 0; - this.v0Lock = new ReentrantReadWriteLock(false); - this.v1Lock = new ReentrantReadWriteLock(false); - this.datasetLock = new ReentrantReadWriteLock(true); - } - - public int getVersion() { - return version; - } - - public void setVersion(int version) { - this.version = version; - } - - public ReentrantReadWriteLock getV0Lock() { - return v0Lock; - } - - public void setV0Lock(ReentrantReadWriteLock v0Lock) { - this.v0Lock = v0Lock; - } - - public ReentrantReadWriteLock getV1Lock() { - return v1Lock; - } - - public void setV1Lock(ReentrantReadWriteLock v1Lock) { - this.v1Lock = v1Lock; - } - - public int refreshBegin() { - datasetLock.writeLock().lock(); - if (version == 0) { - v1Lock.writeLock().lock(); - } else { - v0Lock.writeLock().lock(); - } - return version; - } - - public void refreshEnd(boolean success) { - if (version == 0) { - v1Lock.writeLock().unlock(); - if (success) { - version = 1; - } - } else { - v0Lock.writeLock().unlock(); - if (success) { - version = 0; - } - } - datasetLock.writeLock().unlock(); - } - - public synchronized int buildIndexBegin(boolean isFirstIndex) { - if (isFirstIndex) { - datasetLock.writeLock().lock(); - } else { - datasetLock.readLock().lock(); - } - return version; - } - - public void buildIndexEnd(boolean isFirstIndex) { - if (isFirstIndex) { - datasetLock.writeLock().unlock(); - } else { - datasetLock.readLock().unlock(); - } - } - - public int queryBegin() { - if (version == 0) { - v0Lock.readLock().lock(); - return 0; - } else { - v1Lock.readLock().lock(); - return 1; - } - } - - public void queryEnd(int version) { - if (version == 0) { - v0Lock.readLock().unlock(); - } else { - v1Lock.readLock().unlock(); - } - } -} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 4580a8db36..808b8a4578 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -174,9 +174,6 @@ public class IndexUtil { secondaryIndexHelper = SecondaryTreeIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); } - if (files != null) { - ((SecondaryIndexOperationsHelper) secondaryIndexHelper).setExternalFiles(files); - } return secondaryIndexHelper.buildLoadingJobSpec(); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 25b138dbd9..ea7da19bf9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -29,7 +29,6 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; @@ -119,7 +118,6 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO protected int[] primaryFilterFields; protected int[] primaryBTreeFields; protected int[] secondaryBTreeFields; - protected List<ExternalFile> externalFiles; protected int numPrimaryKeys; protected final SourceLocation sourceLoc; protected final int sortNumFrames; @@ -519,10 +517,6 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO new RecordDescriptor[] { secondaryRecDesc }); } - public void setExternalFiles(List<ExternalFile> externalFiles) { - this.externalFiles = externalFiles; - } - @Override public RecordDescriptor getSecondaryRecDesc() { return secondaryRecDesc;