Yingyi Bu has submitted this change and it was merged. Change subject: ASTERIXDB-1494: refactor ExternalLibraryManager and make it non-static. ......................................................................
ASTERIXDB-1494: refactor ExternalLibraryManager and make it non-static. Change-Id: I5c287a35ff90c3aea639d3069d5a842e28c5e508 Reviewed-on: https://asterix-gerrit.ics.uci.edu/980 Reviewed-by: abdullah alamoudi <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/TestLibrarian.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RepeatedTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java 32 files changed, 337 insertions(+), 182 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java index 7db93e8..0fb6be7 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java @@ -124,7 +124,7 @@ @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { UnnestMapOperator unnestMap = (UnnestMapOperator) op; ILogicalExpression expr = unnestMap.getExpressionRef().getValue(); if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { @@ -145,7 +145,7 @@ } AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider(); - Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> externalLoopup = AqlMetadataProvider + Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> externalLoopup = metadataProvider .buildExternalDataLookupRuntime(builder.getJobSpec(), dataset, secondaryIndex, ridIndexes, retainInput, typeEnv, outputVars, opSchema, context, metadataProvider, retainNull); builder.contributeHyracksOperator(unnestMap, externalLoopup.first); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java index 5b3e453..1545325 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java @@ -43,6 +43,7 @@ import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; @@ -52,6 +53,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.external.feed.management.FeedManager; +import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.api.IAsterixStateProxy; @@ -126,6 +128,8 @@ private IRemoteRecoveryManager remoteRecoveryManager; private IReplicaResourcesManager replicaResourcesManager; private final int metadataRmiPort; + + private ILibraryManager libraryManager; public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) throws AsterixException { @@ -265,6 +269,11 @@ lccm.register((ILifeCycleComponent) datasetLifecycleManager); lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager()); lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager()); + + /** + * Initializes the library manager. + */ + libraryManager = new ExternalLibraryManager(); } @Override @@ -407,13 +416,18 @@ } @Override + public ILibraryManager getLibraryManager() { + return libraryManager; + } + + @Override public void initializeResourceIdFactory() throws HyracksDataException { resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory(); } @Override public void initializeMetadata(boolean newUniverse) throws Exception { - IAsterixStateProxy proxy = null; + IAsterixStateProxy proxy; if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Bootstrapping metadata"); } @@ -446,4 +460,5 @@ public void unexportMetadataNodeStub() throws RemoteException { UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false); } + } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java index d4c3b0e..6922379 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java @@ -258,12 +258,13 @@ * @throws Exception */ private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator( - JobSpecification jobSpec, IAType itemType, Dataset dataset, List<ExternalFile> files, - RecordDescriptor indexerDesc) throws AsterixException { + AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset, + List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException { ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); Map<String, String> configuration = externalDatasetDetails.getProperties(); IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory( - externalDatasetDetails.getAdapter(), configuration, (ARecordType) itemType, files, true, null); + metadataProvider.getLibraryManager(), externalDatasetDetails.getAdapter(), configuration, + (ARecordType) itemType, files, true, null); return new Pair<>(new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory), adapterFactory.getPartitionConstraint()); } @@ -274,7 +275,7 @@ if (files == null) { files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset); } - return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc); + return getExternalDataIndexingOperator(metadataProvider, spec, itemType, dataset, files, indexerDesc); } /** diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index 04aebf5..92ef062 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -35,10 +35,10 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.external.library.ExternalLibrary; -import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.external.library.LibraryAdapter; import org.apache.asterix.external.library.LibraryFunction; import org.apache.asterix.metadata.MetadataManager; @@ -52,10 +52,13 @@ public class ExternalLibraryUtils { - private static Logger LOGGER = Logger.getLogger(ExternalLibraryUtils.class.getName()); + private static final Logger LOGGER = Logger.getLogger(ExternalLibraryUtils.class.getName()); - public static void setUpExternaLibraries(boolean isMetadataNode) throws Exception { + private ExternalLibraryUtils() { + } + public static void setUpExternaLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode) + throws Exception { // start by un-installing removed libraries (Metadata Node only) Map<String, List<String>> uninstalledLibs = null; if (isMetadataNode) { @@ -72,7 +75,7 @@ String[] libraries = dataverseDir.list(); for (String library : libraries) { // for each file (library), register library - registerLibrary(dataverse, library, isMetadataNode, installLibDir); + registerLibrary(externalLibraryManager, dataverse, library); // is metadata node? if (isMetadataNode) { // get library file @@ -88,6 +91,7 @@ /** * un-install libraries. + * * @return a map from dataverse -> list of uninstalled libraries. * @throws Exception */ @@ -126,6 +130,7 @@ * TODO Currently, external libraries only include functions and adapters. we need to extend this to include: * 1. external data source * 2. data parser + * * @param dataverse * @param libraryName * @return true if the library was found and removed, false otherwise @@ -181,9 +186,9 @@ } /** - * Each element of a library is installed as part of a transaction. Any - * failure in installing an element does not effect installation of other - * libraries. + * Each element of a library is installed as part of a transaction. Any + * failure in installing an element does not effect installation of other + * libraries. */ protected static void installLibraryIfNeeded(String dataverse, final File libraryDir, Map<String, List<String>> uninstalledLibs) throws Exception { @@ -284,22 +289,23 @@ /** * register the library class loader with the external library manager + * * @param dataverse * @param libraryName - * @param isMetadataNode * @param installLibDir * @throws Exception */ - protected static void registerLibrary(String dataverse, String libraryName, boolean isMetadataNode, - File installLibDir) throws Exception { + protected static void registerLibrary(ILibraryManager externalLibraryManager, String dataverse, String libraryName) + throws Exception { // get the class loader ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName); // register it with the external library manager - ExternalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader); + externalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader); } /** * Get the library from the xml file + * * @param libraryXMLPath * @return * @throws Exception @@ -313,6 +319,7 @@ /** * Get the class loader for the library + * * @param dataverse * @param libraryName * @return @@ -382,7 +389,7 @@ } /** - * @return the directory "$(pwd)/library": This needs to be improved + * @return the directory "$(pwd)/library": This needs to be improved */ protected static File getLibraryInstallDir() { String workingDir = System.getProperty("user.dir"); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java index 95afc5b..295b308 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java @@ -2003,7 +2003,7 @@ default: throw new IllegalStateException(); } - FeedMetadataUtil.validateFeed(feed, mdTxnCtx); + FeedMetadataUtil.validateFeed(feed, mdTxnCtx, metadataProvider.getLibraryManager()); MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index 4c42594..06ec6bc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -40,10 +40,12 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.config.AsterixExternalProperties; import org.apache.asterix.common.config.AsterixMetadataProperties; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.utils.ServletUtil.Servlets; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.SqlppCompilationProvider; import org.apache.asterix.event.service.ILookupService; +import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.api.IAsterixStateProxy; @@ -87,8 +89,10 @@ appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager())); GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager((HyracksConnection) getNewHyracksClientConnection()); - - AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.INSTANCE); + ILibraryManager libraryManager = new ExternalLibraryManager(); + ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); + AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.INSTANCE, + libraryManager); proxy = AsterixStateProxy.registerRemoteObject(); appCtx.setDistributedState(proxy); @@ -107,7 +111,7 @@ setupFeedServer(externalProperties); feedServer.start(); - ExternalLibraryUtils.setUpExternaLibraries(false); + ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE); ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index c71d77e..2555b5a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -79,12 +79,11 @@ @Override public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception { CmdLineParser parser = new CmdLineParser(this); - try { parser.parseArgument(args); } catch (CmdLineException e) { - System.err.println(e.getMessage()); - System.err.println("Usage:"); + LOGGER.severe(e.getMessage()); + LOGGER.severe("Usage:"); parser.printUsage(System.err); throw e; } @@ -211,7 +210,8 @@ if (isMetadataNode && !pendingFailbackCompletion) { runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE); } - ExternalLibraryUtils.setUpExternaLibraries(isMetadataNode && !pendingFailbackCompletion); + ExternalLibraryUtils.setUpExternaLibraries(runtimeContext.getLibraryManager(), + isMetadataNode && !pendingFailbackCompletion); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting lifecycle components"); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/TestLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/TestLibrarian.java index 3416a77..ba34024 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/TestLibrarian.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/TestLibrarian.java @@ -26,8 +26,8 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.event.service.AsterixEventServiceUtil; -import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.test.aql.ITestLibrarian; import org.apache.commons.io.FileUtils; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -36,6 +36,14 @@ public class TestLibrarian implements ITestLibrarian { public static final String LIBRARY_DIR_NAME = "library"; + + // The following list includes a library manager for the CC + // and library managers for NCs (one-per-NC). + private final List<ILibraryManager> libraryManagers; + + public TestLibrarian(List<ILibraryManager> libraryManagers) { + this.libraryManagers = libraryManagers; + } @Override public void install(String dvName, String libName, String libPath) throws Exception { @@ -56,8 +64,10 @@ throw new Exception("Couldn't unzip the file: " + libPath, e); } - // for each file (library), register library - ExternalLibraryUtils.registerLibrary(dvName, libName, true, destinationDir); + + for (ILibraryManager libraryManager : libraryManagers) { + ExternalLibraryUtils.registerLibrary(libraryManager, dvName, libName); + } // get library file // install if needed (add functions, adapters, datasources, parsers to the metadata) // <Not required for use> @@ -67,7 +77,9 @@ @Override public void uninstall(String dvName, String libName) throws RemoteException, AsterixException, ACIDException { ExternalLibraryUtils.uninstallLibrary(dvName, libName); - ExternalLibraryManager.deregisterLibraryClassLoader(dvName, libName); + for (ILibraryManager libraryManager : libraryManagers) { + libraryManager.deregisterLibraryClassLoader(dvName, libName); + } } public static void removeLibraryDir() throws IOException { @@ -78,11 +90,16 @@ FileUtils.deleteQuietly(installLibDir); } - public static void cleanup() throws AsterixException, RemoteException, ACIDException { - List<Pair<String, String>> libs = ExternalLibraryManager.getAllLibraries(); - for (Pair<String, String> dvAndLib : libs) { - ExternalLibraryUtils.uninstallLibrary(dvAndLib.first, dvAndLib.second); - ExternalLibraryManager.deregisterLibraryClassLoader(dvAndLib.first, dvAndLib.second); + public void cleanup() throws AsterixException, RemoteException, ACIDException { + for (ILibraryManager libraryManager : libraryManagers) { + List<Pair<String, String>> libs = libraryManager.getAllLibraries(); + for (Pair<String, String> dvAndLib : libs) { + ExternalLibraryUtils.uninstallLibrary(dvAndLib.first, dvAndLib.second); + libraryManager.deregisterLibraryClassLoader(dvAndLib.first, dvAndLib.second); + } } + // get the directory of the to be installed libraries + File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); + FileUtils.deleteQuietly(installLibDir); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java index dfb61c2..baa3923 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java @@ -21,10 +21,12 @@ import java.io.File; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.logging.Logger; import org.apache.asterix.app.external.TestLibrarian; import org.apache.asterix.common.config.AsterixTransactionProperties; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.test.aql.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; import org.apache.commons.lang3.StringUtils; @@ -49,9 +51,10 @@ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; protected static AsterixTransactionProperties txnProperties; - private static final TestExecutor testExecutor = new TestExecutor(); + protected static final TestExecutor testExecutor = new TestExecutor(); private static final boolean cleanupOnStart = true; private static final boolean cleanupOnStop = true; + private static TestLibrarian librarian; @BeforeClass public static void setUp() throws Exception { @@ -60,8 +63,9 @@ outdir.mkdirs(); // remove library directory TestLibrarian.removeLibraryDir(); - testExecutor.setLibrarian(new TestLibrarian()); - ExecutionTestUtil.setUp(cleanupOnStart); + List<ILibraryManager> libraryManagers = ExecutionTestUtil.setUp(cleanupOnStart); + librarian = new TestLibrarian(libraryManagers); + testExecutor.setLibrarian(librarian); } catch (Throwable th) { th.printStackTrace(); throw th; @@ -102,7 +106,7 @@ @Test public void test() throws Exception { - TestLibrarian.cleanup(); + librarian.cleanup(); testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java index d919c92..813def3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java @@ -20,18 +20,21 @@ import java.io.File; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.IdentitiyResolverFactory; +import org.apache.asterix.om.util.AsterixAppContextInfo; import org.apache.asterix.testframework.xml.TestGroup; import org.apache.asterix.testframework.xml.TestSuite; import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.hyracks.storage.common.buffercache.BufferCache; public class ExecutionTestUtil { @@ -43,7 +46,7 @@ protected static TestGroup FailedGroup; - public static void setUp(boolean cleanup) throws Exception { + public static List<ILibraryManager> setUp(boolean cleanup) throws Exception { System.out.println("Starting setup"); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting setup"); @@ -69,16 +72,17 @@ FailedGroup = new TestGroup(); FailedGroup.setName("failed"); - } - private static void validateBufferCacheState() { + List<ILibraryManager> libraryManagers = new ArrayList<>(); + // Adds the library manager for CC. + libraryManagers.add(AsterixAppContextInfo.getInstance().getLibraryManager()); + // Adds library managers for NCs, one-per-NC. for (NodeControllerService nc : AsterixHyracksIntegrationUtil.ncs) { - IAsterixAppRuntimeContext appCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext() + IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext() .getApplicationObject(); - if (!((BufferCache) appCtx.getBufferCache()).isClean()) { - throw new IllegalStateException(); - } + libraryManagers.add(runtimeCtx.getLibraryManager()); } + return libraryManagers; } public static void tearDown(boolean cleanup) throws Exception { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RepeatedTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RepeatedTest.java index b577d29..7d39b31 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RepeatedTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RepeatedTest.java @@ -23,8 +23,6 @@ import java.lang.annotation.Target; import java.util.Collection; -import org.apache.asterix.app.external.TestLibrarian; -import org.apache.asterix.test.aql.TestExecutor; import org.apache.asterix.test.runtime.RepeatRule.Repeat; import org.apache.asterix.testframework.context.TestCaseContext; import org.junit.Rule; @@ -84,7 +82,6 @@ public class RepeatedTest extends ExecutionTest { private int count; - private final TestExecutor testExecutor = new TestExecutor(); @Parameters(name = "RepeatedTest {index}: {0}") public static Collection<Object[]> tests() throws Exception { @@ -94,7 +91,6 @@ public RepeatedTest(TestCaseContext tcCtx) { super(tcCtx); - testExecutor.setLibrarian(new TestLibrarian()); count = 0; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java index ee9ed4a..100da63 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; @@ -83,6 +84,8 @@ public IReplicationChannel getReplicationChannel(); + public ILibraryManager getLibraryManager(); + public void initializeResourceIdFactory() throws HyracksDataException; /** diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java index 9647452..57fb335 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.common.IStorageManagerInterface; @@ -50,5 +51,13 @@ */ public ICCApplicationContext getCCApplicationContext(); + /** + * @return the global recovery manager. + */ public IGlobalRecoveryMaanger getGlobalRecoveryManager(); + + /** + * @return the library manager (at CC side). + */ + public ILibraryManager getLibraryManager(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java new file mode 100644 index 0000000..5dca73e --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.common.library; + +import java.util.List; + +import org.apache.hyracks.algebricks.common.utils.Pair; + +public interface ILibraryManager { + + /** + * Registers the library class loader with the external library manager. + * <code>dataverseName</code> and <code>libraryName</code> uniquely identifies a class loader. + * + * @param dataverseName + * @param libraryName + * @param classLoader + */ + public void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader); + + /** + * @return all registered libraries. + */ + public List<Pair<String, String>> getAllLibraries(); + + /** + * De-registers a library class loader. + * + * @param dataverseName + * @param libraryName + */ + public void deregisterLibraryClassLoader(String dataverseName, String libraryName); + + /** + * Finds a class loader for a given pair of dataverse name and library name. + * + * @param dataverseName + * @param libraryName + * @return the library class loader associated with the dataverse and library. + */ + public ClassLoader getLibraryClassLoader(String dataverseName, String libraryName); +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 22f046f..f4e67f3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -21,7 +21,9 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataFlowController; import org.apache.asterix.external.api.IDataParserFactory; @@ -83,8 +85,10 @@ @Override public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException { + IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext() + .getApplicationContext().getApplicationObject(); try { - restoreExternalObjects(); + restoreExternalObjects(runtimeCtx.getLibraryManager()); } catch (AsterixException e) { throw new HyracksDataException(e); } @@ -94,9 +98,8 @@ } feedLogManager.touch(); } - IDataFlowController controller = - DataflowControllerProvider.getDataflowController(recordType, ctx, partition, - dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager); + IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition, + dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager); if (isFeed) { return new FeedAdapter((AbstractFeedDataFlowController) controller); } else { @@ -104,9 +107,9 @@ } } - private void restoreExternalObjects() throws AsterixException { + private void restoreExternalObjects(ILibraryManager libraryManager) throws AsterixException { if (dataSourceFactory == null) { - dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration); + dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration); // create and configure parser factory if (dataSourceFactory.isIndexible() && (files != null)) { ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp); @@ -115,7 +118,7 @@ } if (dataParserFactory == null) { // create and configure parser factory - dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration); + dataParserFactory = ParserFactoryProvider.getDataParserFactory(libraryManager, configuration); dataParserFactory.setRecordType(recordType); dataParserFactory.setMetaType(metaType); dataParserFactory.configure(configuration); @@ -123,17 +126,16 @@ } @Override - public void configure(Map<String, String> configuration) - throws AsterixException { + public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException { this.configuration = configuration; ExternalDataUtils.validateDataSourceParameters(configuration); - dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration); + dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration); if (dataSourceFactory.isIndexible() && (files != null)) { ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp); } dataSourceFactory.configure(configuration); ExternalDataUtils.validateDataParserParameters(configuration); - dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration); + dataParserFactory = ParserFactoryProvider.getDataParserFactory(libraryManager, configuration); dataParserFactory.setRecordType(recordType); dataParserFactory.setMetaType(metaType); dataParserFactory.configure(configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java index fa6897f..01fcfc2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.ILookupReaderFactory; import org.apache.asterix.external.api.ILookupRecordReader; import org.apache.asterix.external.api.IRecordDataParser; @@ -76,10 +77,11 @@ } } - public void configure(Map<String, String> configuration) throws AsterixException { + public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException { this.configuration = configuration; readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration); - dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration); + dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(libraryManager, + configuration); dataParserFactory.setRecordType(recordType); readerFactory.configure(configuration); dataParserFactory.configure(configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java index 505c4b2..2d42ba9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -65,11 +66,11 @@ public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException; /** + * @param libraryManager * @param configuration * @throws Exception */ - public void configure(Map<String, String> configuration) - throws AsterixException; + public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException; public void setOutputType(ARecordType outputType); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java index 7f416c7..b0594d2 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java @@ -20,7 +20,9 @@ import java.io.IOException; +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IExternalFunction; import org.apache.asterix.external.api.IFunctionFactory; import org.apache.asterix.external.api.IFunctionHelper; @@ -29,6 +31,7 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.EnumDeserializer; import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.om.util.AsterixAppContextInfo; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -64,7 +67,17 @@ String[] fnameComponents = finfo.getFunctionIdentifier().getName().split("#"); String functionLibary = fnameComponents[0]; String dataverse = finfo.getFunctionIdentifier().getNamespace(); - ClassLoader libraryClassLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, functionLibary); + ILibraryManager libraryManager; + if (context == null) { + // Gets the library manager for compile-time constant folding. + libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager(); + } else { + // Gets the library manager for real runtime evaluation. + IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) context.getJobletContext() + .getApplicationContext().getApplicationObject(); + libraryManager = runtimeCtx.getLibraryManager(); + } + ClassLoader libraryClassLoader = libraryManager.getLibraryClassLoader(dataverse, functionLibary); String classname = finfo.getFunctionBody().trim(); Class<?> clazz; try { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java index 0283bf2..3373f70 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java @@ -24,19 +24,15 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.hyracks.algebricks.common.utils.Pair; -public class ExternalLibraryManager { +public class ExternalLibraryManager implements ILibraryManager { - private static final Map<String, ClassLoader> libraryClassLoaders = new HashMap<String, ClassLoader>(); + private final Map<String, ClassLoader> libraryClassLoaders = new HashMap<>(); - /** - * Register the library class loader with the external library manager - * @param dataverseName - * @param libraryName - * @param classLoader - */ - public static void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader) { + @Override + public void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader) { String key = getKey(dataverseName, libraryName); synchronized (libraryClassLoaders) { if (libraryClassLoaders.get(key) != null) { @@ -46,7 +42,8 @@ } } - public static List<Pair<String, String>> getAllLibraries() { + @Override + public List<Pair<String, String>> getAllLibraries() { ArrayList<Pair<String, String>> libs = new ArrayList<>(); synchronized (libraryClassLoaders) { for (Entry<String, ClassLoader> entry : libraryClassLoaders.entrySet()) { @@ -56,7 +53,8 @@ return libs; } - public static void deregisterLibraryClassLoader(String dataverseName, String libraryName) { + @Override + public void deregisterLibraryClassLoader(String dataverseName, String libraryName) { String key = getKey(dataverseName, libraryName); synchronized (libraryClassLoaders) { if (libraryClassLoaders.get(key) != null) { @@ -65,7 +63,8 @@ } } - public static ClassLoader getLibraryClassLoader(String dataverseName, String libraryName) { + @Override + public ClassLoader getLibraryClassLoader(String dataverseName, String libraryName) { String key = getKey(dataverseName, libraryName); return libraryClassLoaders.get(key); } @@ -75,10 +74,10 @@ } private static Pair<String, String> getDataverseAndLibararyName(String key) { - int index = key.indexOf("."); + int index = key.indexOf('.'); String dataverse = key.substring(0, index); String library = key.substring(index + 1); - return new Pair<String, String>(dataverse, library); + return new Pair<>(dataverse, library); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 58e9ea7..69aa59a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -21,11 +21,12 @@ import java.util.Map; import java.util.logging.Logger; +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.external.feed.management.FeedId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -94,24 +95,26 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { if (adaptorFactory == null) { - try { - adaptorFactory = createExternalAdapterFactory(ctx, partition); - } catch (Exception exception) { - throw new HyracksDataException(exception); - } + adaptorFactory = createExternalAdapterFactory(ctx); } return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, policyAccessor, recordDescProvider, this); } - private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception { - IAdapterFactory adapterFactory = null; - ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(), - adaptorLibraryName); + private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException { + IAdapterFactory adapterFactory; + IAsterixAppRuntimeContext runtimeCtx = + (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); + ILibraryManager libraryManager = runtimeCtx.getLibraryManager(); + ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName); if (classLoader != null) { - adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance())); - adapterFactory.setOutputType(adapterOutputType); - adapterFactory.configure(adaptorConfiguration); + try { + adapterFactory = (IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()); + adapterFactory.setOutputType(adapterOutputType); + adapterFactory.configure(libraryManager, adaptorConfiguration); + } catch (Exception e) { + throw new HyracksDataException(e); + } } else { String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName + " in dataverse " + feedId.getDataverse(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java index 4f699fc..7465455 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java @@ -35,6 +35,7 @@ @Override public void configure(Map<String, String> configuration) { + // Nothing to be configured. } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java index f20f802..d6e536d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java @@ -35,6 +35,7 @@ @Override public void configure(Map<String, String> configuration) { + // Nothing to be configured. } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java index 8f6c85f..4cc8e33 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; import org.apache.asterix.external.api.IAdapterFactory; @@ -37,18 +38,18 @@ public class AdapterFactoryProvider { // Adapters - public static IAdapterFactory getAdapterFactory(String adapterName, Map<String, String> configuration, - ARecordType itemType, ARecordType metaType) throws AsterixException { + public static IAdapterFactory getAdapterFactory(ILibraryManager libraryManager, String adapterName, + Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AsterixException { ExternalDataCompatibilityUtils.prepare(adapterName, configuration); GenericAdapterFactory adapterFactory = new GenericAdapterFactory(); adapterFactory.setOutputType(itemType); adapterFactory.setMetaType(metaType); - adapterFactory.configure(configuration); + adapterFactory.configure(libraryManager, configuration); return adapterFactory; } // Indexing Adapters - public static IIndexingAdapterFactory getIndexingAdapterFactory(String adapterName, + public static IIndexingAdapterFactory getIndexingAdapterFactory(ILibraryManager libraryManager, String adapterName, Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp, ARecordType metaType) throws AsterixException { ExternalDataCompatibilityUtils.prepare(adapterName, configuration); @@ -56,17 +57,17 @@ adapterFactory.setOutputType(itemType); adapterFactory.setMetaType(metaType); adapterFactory.setSnapshot(snapshot, indexingOp); - adapterFactory.configure(configuration); + adapterFactory.configure(libraryManager, configuration); return adapterFactory; } // Lookup Adapters - public static LookupAdapterFactory<?> getLookupAdapterFactory(Map<String, String> configuration, - ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainMissing, - IMissingWriterFactory missingWriterFactory) throws AsterixException { + public static LookupAdapterFactory<?> getLookupAdapterFactory(ILibraryManager libraryManager, + Map<String, String> configuration, ARecordType recordType, int[] ridFields, boolean retainInput, + boolean retainMissing, IMissingWriterFactory missingWriterFactory) throws AsterixException { LookupAdapterFactory<?> adapterFactory = new LookupAdapterFactory<>(recordType, ridFields, retainInput, retainMissing, missingWriterFactory); - adapterFactory.configure(configuration); + adapterFactory.configure(libraryManager, configuration); return adapterFactory; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index 452ac6e..ad11171 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType; import org.apache.asterix.external.api.IInputStreamFactory; @@ -41,24 +42,25 @@ private DatasourceFactoryProvider() { } - public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration) - throws AsterixException { + public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager, + Map<String, String> configuration) throws AsterixException { if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) { String reader = configuration.get(ExternalDataConstants.KEY_READER); - return DatasourceFactoryProvider.getRecordReaderFactory(reader, configuration); + return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration); } else { // get stream source String streamSource = configuration.get(ExternalDataConstants.KEY_STREAM_SOURCE); - return DatasourceFactoryProvider.getInputStreamFactory(streamSource, configuration); + return DatasourceFactoryProvider.getInputStreamFactory(libraryManager, streamSource, configuration); } } - public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration) - throws AsterixException { + public static IInputStreamFactory getInputStreamFactory(ILibraryManager libraryManager, String streamSource, + Map<String, String> configuration) throws AsterixException { IInputStreamFactory streamSourceFactory; if (ExternalDataUtils.isExternal(streamSource)) { String dataverse = ExternalDataUtils.getDataverse(configuration); - streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource); + streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, + streamSource); } else { switch (streamSource) { case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM: @@ -85,10 +87,10 @@ return streamSourceFactory; } - public static IRecordReaderFactory<?> getRecordReaderFactory(String reader, Map<String, String> configuration) - throws AsterixException { + public static IRecordReaderFactory<?> getRecordReaderFactory(ILibraryManager libraryManager, String reader, + Map<String, String> configuration) throws AsterixException { if (reader.equals(ExternalDataConstants.EXTERNAL)) { - return ExternalDataUtils.createExternalRecordReaderFactory(configuration); + return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration); } switch (reader) { case ExternalDataConstants.READER_HDFS: diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java index b37198a..e98e744 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java @@ -23,6 +23,7 @@ import javax.annotation.Nonnull; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IDataParserFactory; import org.apache.asterix.external.parser.factory.ADMDataParserFactory; import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory; @@ -38,12 +39,13 @@ private ParserFactoryProvider() { } - public static IDataParserFactory getDataParserFactory(Map<String, String> configuration) throws AsterixException { + public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager, + Map<String, String> configuration) throws AsterixException { IDataParserFactory parserFactory; String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER); if ((parserFactoryName != null) && ExternalDataUtils.isExternal(parserFactoryName)) { - return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration), - parserFactoryName); + return ExternalDataUtils.createExternalParserFactory(libraryManager, + ExternalDataUtils.getDataverse(configuration), parserFactoryName); } else { String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration); if (parserFactoryKey == null) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 69882d0..19781f9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -22,11 +22,11 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IDataParserFactory; import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.AUnionType; @@ -114,8 +114,8 @@ && (aString.trim().length() > 1)); } - public static ClassLoader getClassLoader(String dataverse, String library) { - return ExternalLibraryManager.getLibraryClassLoader(dataverse, library); + public static ClassLoader getClassLoader(ILibraryManager libraryManager, String dataverse, String library) { + return libraryManager.getLibraryClassLoader(dataverse, library); } public static String getLibraryName(String aString) { @@ -126,12 +126,12 @@ return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1]; } - public static IInputStreamFactory createExternalInputStreamFactory(String dataverse, String stream) - throws AsterixException { + public static IInputStreamFactory createExternalInputStreamFactory(ILibraryManager libraryManager, String dataverse, + String stream) throws AsterixException { try { String libraryName = getLibraryName(stream); String className = getExternalClassName(stream); - ClassLoader classLoader = getClassLoader(dataverse, libraryName); + ClassLoader classLoader = getClassLoader(libraryManager, dataverse, libraryName); return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance())); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { throw new AsterixException("Failed to create stream factory", e); @@ -210,8 +210,8 @@ return false; } - public static IRecordReaderFactory<?> createExternalRecordReaderFactory(Map<String, String> configuration) - throws AsterixException { + public static IRecordReaderFactory<?> createExternalRecordReaderFactory(ILibraryManager libraryManager, + Map<String, String> configuration) throws AsterixException { String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY); if (readerFactory == null) { throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter " @@ -228,8 +228,7 @@ + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\""); } - ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverseAndLibrary[0], - dataverseAndLibrary[1]); + ClassLoader classLoader = libraryManager.getLibraryClassLoader(dataverseAndLibrary[0], dataverseAndLibrary[1]); try { return (IRecordReaderFactory<?>) classLoader.loadClass(libraryAndFactory[1]).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { @@ -237,12 +236,12 @@ } } - public static IDataParserFactory createExternalParserFactory(String dataverse, String parserFactoryName) - throws AsterixException { + public static IDataParserFactory createExternalParserFactory(ILibraryManager libraryManager, String dataverse, + String parserFactoryName) throws AsterixException { try { String library = parserFactoryName.substring(0, parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR)); - ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library); + ClassLoader classLoader = libraryManager.getLibraryClassLoader(dataverse, library); return (IDataParserFactory) classLoader .loadClass(parserFactoryName .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1)) diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index e029c09..6850c2b 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.IAsterixPropertiesProvider; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; @@ -77,11 +78,9 @@ ADMDataParser parser; ITupleForwarder forwarder; ArrayTupleBuilder tb; - IAsterixPropertiesProvider propertiesProvider = - (IAsterixPropertiesProvider) ((NodeControllerService) ctx - .getJobletContext().getApplicationContext().getControllerService()) - .getApplicationContext() - .getApplicationObject(); + IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx + .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext() + .getApplicationObject(); ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions() .get(nodeId)[0]; try { @@ -127,7 +126,7 @@ } @Override - public void configure(Map<String, String> configuration) { + public void configure(ILibraryManager context, Map<String, String> configuration) { this.configuration = configuration; } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java index dae8a8f..5b23094 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java @@ -50,7 +50,8 @@ TreeMap<String, String> parserConf = new TreeMap<String, String>(); format = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT); parserConf.put(ExternalDataConstants.KEY_FORMAT, format); - recordParserFactory = (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(parserConf); + recordParserFactory = + (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(null, parserConf); recordParserFactory.setRecordType(recordType); recordParserFactory.configure(configuration); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java index 226ab3c..a5e5cda 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java @@ -42,6 +42,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.StoragePathUtil; @@ -194,6 +195,7 @@ private boolean isTemporaryDatasetWriteJob = true; private final AsterixStorageProperties storageProperties; + private final ILibraryManager libraryManager; public String getPropertyValue(String propertyName) { return config.get(propertyName); @@ -215,6 +217,11 @@ this.defaultDataverse = defaultDataverse; this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores(); this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties(); + this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager(); + } + + public ILibraryManager getLibraryManager() { + return libraryManager; } public void setJobId(JobId jobId) { @@ -482,8 +489,8 @@ List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException { try { configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName()); - IAdapterFactory adapterFactory = - AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, itemType, metaType); + IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName, + configuration, itemType, metaType); // check to see if dataset is indexed Index filesIndex = @@ -535,7 +542,8 @@ public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime( JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception { Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null; - factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx); + factoryOutput = + FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx, libraryManager); ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME); IAdapterFactory adapterFactory = factoryOutput.first; @@ -2080,8 +2088,8 @@ return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } - public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> - splitProviderAndPartitionConstraintsForDataverse(String dataverse) { + public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse( + String dataverse) { return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse); } @@ -2104,8 +2112,8 @@ } public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException { - String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) - : dataverse; + String dv = + dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse; if (dv == null) { return null; } @@ -2195,7 +2203,7 @@ this.locks = locks; } - public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput, IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema, JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainMissing) @@ -2209,7 +2217,7 @@ // Create the adapter factory <- right now there is only one. if there are more in the future, we can create // a map-> ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); - LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory( + LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(libraryManager, datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainMissing, context.getMissingWriterFactory()); @@ -2349,9 +2357,8 @@ ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; - ISerializerDeserializer[] outputSerDes = - new ISerializerDeserializer[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) - + numFilterFields]; + ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; for (int j = 0; j < recordDesc.getFieldCount(); j++) { outputTypeTraits[j] = recordDesc.getTypeTraits()[j]; outputSerDes[j] = recordDesc.getFields()[j]; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 1a51b74..6b55d81 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType; @@ -44,7 +45,6 @@ import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.runtime.FeedRuntimeId; -import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor; import org.apache.asterix.external.provider.AdapterFactoryProvider; @@ -150,8 +150,7 @@ boolean preProcessingRequired = preProcessingRequired(feedConnectionId); // copy operators String operandId = null; - Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = - new HashMap<OperatorDescriptorId, OperatorDescriptorId>(); + Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<>(); FeedMetaOperatorDescriptor metaOp = null; for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) { operandId = FeedRuntimeId.DEFAULT_TARGET_ID; @@ -197,8 +196,7 @@ } // copy connectors - Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = - new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>(); + Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<>(); for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) { IConnectorDescriptor connDesc = entry.getValue(); ConnectorDescriptorId newConnId; @@ -229,9 +227,8 @@ } // prepare for setting partition constraints - Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = - new HashMap<OperatorDescriptorId, List<LocationConstraint>>(); - Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>(); + Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>(); + Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>(); for (Constraint constraint : spec.getUserConstraints()) { LValueConstraintExpression lexpr = constraint.getLValue(); @@ -454,7 +451,7 @@ return preProcessingRequired; } - public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx) + public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx, ILibraryManager libraryManager) throws AsterixException { try { String adapterName = feed.getAdapterName(); @@ -464,9 +461,8 @@ ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName()); ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName()); // Get adapter from metadata dataset <Metadata dataverse> - DatasourceAdapter adapterEntity = - MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, - adapterName); + DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, + MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); // Get adapter from metadata dataset <The feed dataverse> if (adapterEntity == null) { adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName); @@ -483,8 +479,7 @@ case EXTERNAL: String[] anameComponents = adapterName.split("#"); String libraryName = anameComponents[0]; - ClassLoader cl = - ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName); + ClassLoader cl = libraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName); adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); break; default: @@ -492,9 +487,9 @@ } adapterFactory.setOutputType(adapterOutputType); adapterFactory.setMetaType(metaType); - adapterFactory.configure(configuration); + adapterFactory.configure(null, configuration); } else { - AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType, + AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName, configuration, adapterOutputType, metaType); } if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) { @@ -520,9 +515,9 @@ } @SuppressWarnings("rawtypes") - public static Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> - getPrimaryFeedFactoryAndOutput(Feed feed, FeedPolicyAccessor policyAccessor, - MetadataTransactionContext mdTxnCtx) throws AlgebricksException { + public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> getPrimaryFeedFactoryAndOutput(Feed feed, + FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ILibraryManager libraryManager) + throws AlgebricksException { // This method needs to be re-visited String adapterName = null; DatasourceAdapter adapterEntity = null; @@ -556,8 +551,7 @@ case EXTERNAL: String[] anameComponents = adapterName.split("#"); String libraryName = anameComponents[0]; - ClassLoader cl = - ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName); + ClassLoader cl = libraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName); adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); break; default: @@ -565,10 +559,10 @@ } adapterFactory.setOutputType(adapterOutputType); adapterFactory.setMetaType(metaType); - adapterFactory.configure(configuration); + adapterFactory.configure(null, configuration); } else { - adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType, - metaType); + adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName, configuration, + adapterOutputType, metaType); adapterType = IDataSourceAdapter.AdapterType.INTERNAL; } if (metaType == null) { @@ -681,7 +675,7 @@ public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx) - throws AlgebricksException, MetadataException, RemoteException, ACIDException { + throws AlgebricksException, MetadataException, RemoteException, ACIDException { String outputType = null; String primaryFeedName = feed.getSourceFeedName(); Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java index e26a92b..bb1e554 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java @@ -33,6 +33,7 @@ import org.apache.asterix.common.config.IAsterixPropertiesProvider; import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider; import org.apache.hyracks.api.application.IApplicationConfig; import org.apache.hyracks.api.application.ICCApplicationContext; @@ -60,13 +61,14 @@ private AsterixReplicationProperties replicationProperties; private final IGlobalRecoveryMaanger globalRecoveryMaanger; private IHyracksClientConnection hcc; + private final ILibraryManager libraryManager; public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc, - IGlobalRecoveryMaanger globalRecoveryMaanger) throws AsterixException { + IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) throws AsterixException { if (INSTANCE != null) { return; } - INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger); + INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger, libraryManager); // Determine whether to use old-style asterix-configuration.xml or new-style configuration. // QQQ strip this out eventually @@ -92,10 +94,11 @@ } private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc, - IGlobalRecoveryMaanger globalRecoveryMaanger) { + IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) { this.appCtx = ccAppCtx; this.hcc = hcc; this.globalRecoveryMaanger = globalRecoveryMaanger; + this.libraryManager = libraryManager; } public static AsterixAppContextInfo getInstance() { @@ -165,4 +168,9 @@ public IGlobalRecoveryMaanger getGlobalRecoveryManager() { return globalRecoveryMaanger; } + + @Override + public ILibraryManager getLibraryManager() { + return libraryManager; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java index 7917e4a..ad3ffc1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java @@ -54,16 +54,17 @@ private final ClusterControllerService ccs; public CCApplicationContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext, - IApplicationConfig appConfig) throws IOException { + IApplicationConfig appConfig) throws IOException { super(serverCtx, appConfig); this.ccContext = ccContext; this.ccs = ccs; - initPendingNodeIds = new HashSet<String>(); - deinitPendingNodeIds = new HashSet<String>(); - jobLifecycleListeners = new ArrayList<IJobLifecycleListener>(); - clusterLifecycleListeners = new ArrayList<IClusterLifecycleListener>(); + initPendingNodeIds = new HashSet<>(); + deinitPendingNodeIds = new HashSet<>(); + jobLifecycleListeners = new ArrayList<>(); + clusterLifecycleListeners = new ArrayList<>(); } + @Override public ICCContext getCCContext() { return ccContext; } -- To view, visit https://asterix-gerrit.ics.uci.edu/980 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I5c287a35ff90c3aea639d3069d5a842e28c5e508 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
