Ian Maxon has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3386
Change subject: [WIP] Load UDFs via HTTP ...................................................................... [WIP] Load UDFs via HTTP Change-Id: I6be9fef54c010bdb32f5c78af9b973f9843f442f --- A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java 19 files changed, 400 insertions(+), 99 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/86/3386/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java new file mode 100644 index 0000000..9098c25 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.app.message.DeleteUdfMessage; +import org.apache.asterix.app.message.LoadUdfMessage; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.QueryStringDecoder; + +public class UdfApiServlet extends AbstractServlet { + + private static final Logger LOGGER = LogManager.getLogger(); + private final ICcApplicationContext appCtx; + private final ICCMessageBroker broker; + + public UdfApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) { + super(ctx, paths); + this.appCtx = appCtx; + this.broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + } + + private String getResource(FullHttpRequest req, IServletResponse response) { + String[] path = new QueryStringDecoder(req.uri()).path().split("/"); + if (path.length != 4) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return null; + } + String resourceName = path[path.length - 1]; + return resourceName; + } + + @Override + protected void post(IServletRequest request, IServletResponse response) { + FullHttpRequest req = request.getHttpRequest(); + String resourceName = getResource(req, response); + if (resourceName == null) { + return; + } + try { + File udf = File.createTempFile(resourceName, ".zip"); + RandomAccessFile raf = new RandomAccessFile(udf, "rw"); + raf.setLength(req.content().readableBytes()); + FileChannel fc = raf.getChannel(); + ByteBuffer content = request.getHttpRequest().content().nioBuffer(); + while (content.hasRemaining()) { + fc.write(content); + } + fc.force(true); + fc.close(); + IHyracksClientConnection hcc = appCtx.getHcc(); + DeploymentId udfName = new DeploymentId(resourceName); + ClassLoader cl = appCtx.getLibraryManager().getLibraryClassLoader("Default", resourceName); + if (cl != null) { + deleteUdf(resourceName); + } + hcc.deployBinary(udfName, Arrays.asList(udf.toString())); + ExternalLibraryUtils.setUpExternaLibrary(appCtx.getLibraryManager(), false, + FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(), + "applications", udfName.toString())); + LoadUdfMessage msg = new LoadUdfMessage(udfName.toString()); + for (String nc : appCtx.getClusterStateManager().getParticipantNodes()) { + broker.sendApplicationMessageToNC(msg, nc); + } + } catch (Exception e) { + response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); + return; + } + response.setStatus(HttpResponseStatus.OK); + + } + + private void deleteUdf(String resourceName) throws Exception { + DeleteUdfMessage msg = new DeleteUdfMessage(resourceName); + for (String nc : appCtx.getClusterStateManager().getParticipantNodes()) { + broker.sendApplicationMessageToNC(msg, nc); + } + appCtx.getLibraryManager().deregisterLibraryClassLoader("Default", resourceName); + appCtx.getHcc().unDeployBinary(new DeploymentId(resourceName)); + } + + @Override + protected void delete(IServletRequest request, IServletResponse response) { + String resourceName = getResource(request.getHttpRequest(), response); + if (resourceName == null) { + return; + } + try { + deleteUdf(resourceName); + } catch (Exception e) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + response.setStatus(HttpResponseStatus.OK); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index 0eac212..3beea8e 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -60,8 +60,8 @@ private ExternalLibraryUtils() { } - public static void setUpExternaLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode) - throws Exception { + public static void setUpExternaLibrary(ILibraryManager externalLibraryManager, boolean isMetadataNode, + String libraryPath) throws Exception { // start by un-installing removed libraries (Metadata Node only) Map<String, List<String>> uninstalledLibs = null; if (isMetadataNode) { @@ -69,18 +69,26 @@ } // get the directory of the to be installed libraries - File installLibDir = getLibraryInstallDir(); + File installLibDir = new File(libraryPath); // directory exists? if (installLibDir.exists()) { // get the list of files in the directory - for (File dataverseDir : installLibDir.listFiles(File::isDirectory)) { - for (File libraryDir : dataverseDir.listFiles(File::isDirectory)) { - // For each file (library), register classloader and configure its parameter. - // If current node is Metadata Node, add the library to metadata. - registerClassLoader(externalLibraryManager, dataverseDir.getName(), libraryDir.getName()); - configureLibrary(externalLibraryManager, dataverseDir.getName(), libraryDir, uninstalledLibs, - isMetadataNode); - } + registerClassLoader(externalLibraryManager, "Default", installLibDir.getAbsolutePath()); + configureLibrary(externalLibraryManager, "Default", installLibDir, uninstalledLibs, isMetadataNode); + } + } + + public static void setUpInstalledLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode, + File appDir) throws Exception { + File[] libs = appDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return dir.isDirectory(); + } + }); + if (libs != null) { + for (File lib : libs) { + setUpExternaLibrary(externalLibraryManager, isMetadataNode, lib.getAbsolutePath()); } } } @@ -134,7 +142,7 @@ * @throws RemoteException * @throws ACIDException */ - protected static boolean uninstallLibrary(String dataverse, String libraryName) + public static boolean uninstallLibrary(String dataverse, String libraryName) throws AsterixException, RemoteException, ACIDException { MetadataTransactionContext mdTxnCtx = null; try { @@ -303,15 +311,15 @@ * register the library class loader with the external library manager * * @param dataverse - * @param libraryName + * @param libraryPath * @throws Exception */ protected static void registerClassLoader(ILibraryManager externalLibraryManager, String dataverse, - String libraryName) throws Exception { + String libraryPath) throws Exception { // get the class loader - ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName); + ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryPath); // register it with the external library manager - externalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader); + externalLibraryManager.registerLibraryClassLoader(dataverse, new File(libraryPath).getName(), classLoader); } /** @@ -331,22 +339,22 @@ /** * Get the class loader for the library * + * @param libraryPath * @param dataverse - * @param libraryName * @return * @throws Exception */ - private static ClassLoader getLibraryClassLoader(String dataverse, String libraryName) throws Exception { + private static ClassLoader getLibraryClassLoader(String dataverse, String libraryPath) throws Exception { // Get a reference to the library directory - File installDir = getLibraryInstallDir(); + File installDir = new File(libraryPath); if (LOGGER.isInfoEnabled()) { - LOGGER.info("Installing lirbary " + libraryName + " in dataverse " + dataverse + "." - + " Install Directory: " + installDir.getAbsolutePath()); + LOGGER.info("Installing lirbary " + dataverse + " in dataverse " + dataverse + "." + " Install Directory: " + + installDir.getAbsolutePath()); } // get a reference to the specific library dir - File libDir = - new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName); + File libDir = installDir; + FilenameFilter jarFileFilter = new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -388,7 +396,7 @@ } if (LOGGER.isInfoEnabled()) { - StringBuilder logMesg = new StringBuilder("Classpath for library " + libraryName + "\n"); + StringBuilder logMesg = new StringBuilder("Classpath for library " + dataverse + "\n"); for (URL url : urls) { logMesg.append(url.getFile() + "\n"); } @@ -397,14 +405,6 @@ // create and return the class loader return new ExternalLibraryClassLoader(urls, parentClassLoader); - } - - /** - * @return the directory "System.getProperty("app.home", System.getProperty("user.home")/lib/udfs" - */ - protected static File getLibraryInstallDir() { - return new File(System.getProperty("app.home", System.getProperty("user.home")) + File.separator + "lib" - + File.separator + "udfs"); } /** diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java index 7246925..3cf2fed 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java @@ -18,22 +18,17 @@ */ package org.apache.asterix.app.external; +import static org.apache.hyracks.control.common.deployment.DeploymentUtils.unzip; + import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.rmi.RemoteException; -import java.util.Enumeration; import java.util.HashMap; import java.util.List; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.io.FileUtils; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -49,41 +44,13 @@ } public static void removeLibraryDir() throws IOException { - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); - FileUtils.deleteQuietly(installLibDir); - } - - public static void unzip(String sourceFile, String outputDir) throws IOException { - if (System.getProperty("os.name").toLowerCase().startsWith("win")) { - try (ZipFile zipFile = new ZipFile(sourceFile)) { - Enumeration<? extends ZipEntry> entries = zipFile.entries(); - while (entries.hasMoreElements()) { - ZipEntry entry = entries.nextElement(); - File entryDestination = new File(outputDir, entry.getName()); - if (!entry.isDirectory()) { - entryDestination.getParentFile().mkdirs(); - try (InputStream in = zipFile.getInputStream(entry); - OutputStream out = new FileOutputStream(entryDestination)) { - IOUtils.copy(in, out); - } - } - } - } - } else { - Process process = new ProcessBuilder("unzip", "-d", outputDir, sourceFile).start(); - try { - process.waitFor(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } + //stub } @Override public void install(String dvName, String libName, String libPath) throws Exception { // get the directory of the to be installed libraries - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); + File installLibDir = new File(libPath); // directory exists? if (!installLibDir.exists()) { installLibDir.mkdir(); @@ -124,7 +91,7 @@ } } // get the directory of the to be installed libraries - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); - FileUtils.deleteQuietly(installLibDir); + // File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); + // FileUtils.deleteQuietly(installLibDir); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java new file mode 100644 index 0000000..64e0bb1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; + +public class DeleteUdfMessage implements INcAddressedMessage { + + private final String deploymentId; + + public DeleteUdfMessage(String deploymentId) { + this.deploymentId = deploymentId; + } + + @Override + public void handle(INcApplicationContext appCtx) { + ILibraryManager mgr = appCtx.getLibraryManager(); + String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName(); + String nodeName = appCtx.getServiceContext().getNodeId(); + boolean isMdNode = mdNodeName.equals(nodeName); + try { + if (isMdNode) { + ExternalLibraryUtils.uninstallLibrary("Default", deploymentId); + } + mgr.deregisterLibraryClassLoader("Default", deploymentId); + } catch (Exception e) { + + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java new file mode 100644 index 0000000..cccf3af --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.hyracks.util.file.FileUtil; + +public class LoadUdfMessage implements INcAddressedMessage { + + private final String deploymentId; + + public LoadUdfMessage(String deploymentId) { + this.deploymentId = deploymentId; + } + + @Override + public void handle(INcApplicationContext appCtx) { + ILibraryManager mgr = appCtx.getLibraryManager(); + String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName(); + String nodeName = appCtx.getServiceContext().getNodeId(); + boolean isMdNode = mdNodeName.equals(nodeName); + try { + ExternalLibraryUtils.setUpExternaLibrary(mgr, isMdNode, + FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(), + "applications", deploymentId)); + } catch (Exception e) { + + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java index 8cfeb12..1ca2b78 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java @@ -38,7 +38,8 @@ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { - ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode); + ExternalLibraryUtils.setUpInstalledLibraries(appContext.getLibraryManager(), metadataNode, + cs.getContext().getServerCtx().getAppDir()); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 99286a2..ed8a3d8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -47,6 +47,7 @@ import org.apache.asterix.api.http.server.RebalanceApiServlet; import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.api.http.server.ShutdownApiServlet; +import org.apache.asterix.api.http.server.UdfApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; @@ -148,7 +149,7 @@ ReplicationProperties repProp = new ReplicationProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())); INcLifecycleCoordinator lifecycleCoordinator = createNcLifeCycleCoordinator(repProp.isReplicationEnabled()); - ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); + ExternalLibraryUtils.setUpInstalledLibraries(libraryManager, false, ccServiceCtx.getServerCtx().getAppDir()); componentProvider = new StorageComponentProvider(); ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions())); @@ -259,6 +260,7 @@ addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must not precede add of CLUSTER_STATE addServlet(jsonAPIServer, Servlets.DIAGNOSTICS); addServlet(jsonAPIServer, Servlets.ACTIVE_STATS); + addServlet(jsonAPIServer, Servlets.UDF); return jsonAPIServer; } @@ -311,6 +313,8 @@ return new DiagnosticsApiServlet(appCtx, ctx, paths); case Servlets.ACTIVE_STATS: return new ActiveStatsApiServlet(appCtx, ctx, paths); + case Servlets.UDF: + return new UdfApiServlet(appCtx, ctx, paths); default: throw new IllegalStateException(String.valueOf(key)); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index d5aa5d1..93a959f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -36,6 +36,7 @@ public static final String ACTIVE_STATS = "/admin/active/*"; public static final String STORAGE = "/admin/storage/*"; public static final String NET_DIAGNOSTICS = "/admin/net/*"; + public static final String UDF = "/admin/udf/*"; private Servlets() { } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java new file mode 100644 index 0000000..b997a6e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.application; + +import java.io.File; + +public interface IServerContext { + enum ServerType { + CLUSTER_CONTROLLER, + NODE_CONTROLLER, + } + + ServerType getServerType(); + + File getBaseDir(); + + File getAppDir(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java index 6effee3..0a99c45 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java @@ -69,4 +69,6 @@ default IPersistedResourceRegistry getPersistedResourceRegistry() { throw new UnsupportedOperationException(); } + + IServerContext getServerCtx(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 89f2ad4..44018b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -159,12 +159,22 @@ void waitForCompletion(JobId jobId) throws Exception; /** - * Deploy the user-defined jars to the cluster + * Deploy files to the cluster * - * @param jars - * a list of user-defined jars + * @param files + * a list of file paths */ - DeploymentId deployBinary(List<String> jars) throws Exception; + DeploymentId deployBinary(List<String> files) throws Exception; + + /** + * Deploy files to the cluster + * + * @param files + * a list of file paths + * @param deploymentId + * the id used to uniquely identify this set of files for management + */ + void deployBinary(DeploymentId deploymentId, List<String> files) throws Exception; /** * undeploy a certain deployment diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java index 99fc76b..b6d32a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java @@ -21,16 +21,16 @@ import java.io.Serializable; import java.util.concurrent.ThreadFactory; +import org.apache.hyracks.api.application.IServerContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.io.IPersistedResourceRegistry; import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer; import org.apache.hyracks.api.job.JobSerializerDeserializerContainer; import org.apache.hyracks.api.messages.IMessageBroker; -import org.apache.hyracks.control.common.context.ServerContext; public abstract class ServiceContext implements IServiceContext { - protected final ServerContext serverCtx; + protected final IServerContext serverCtx; protected final IApplicationConfig appConfig; protected ThreadFactory threadFactory; protected Serializable distributedState; @@ -38,7 +38,7 @@ protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer(); protected IPersistedResourceRegistry persistedResourceRegistry; - public ServiceContext(ServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) { + public ServiceContext(IServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) { this.serverCtx = serverCtx; this.appConfig = appConfig; this.threadFactory = threadFactory; @@ -88,4 +88,9 @@ public IPersistedResourceRegistry getPersistedResourceRegistry() { return persistedResourceRegistry; } + + @Override + public IServerContext getServerCtx() { + return serverCtx; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java index ff037f0..ef2777d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java @@ -20,18 +20,18 @@ import java.io.File; -public class ServerContext { - public enum ServerType { - CLUSTER_CONTROLLER, - NODE_CONTROLLER, - } +import org.apache.hyracks.api.application.IServerContext; + +public class ServerContext implements IServerContext { private final ServerType type; private final File baseDir; + private final File appDir; public ServerContext(ServerType type, File baseDir) { this.type = type; this.baseDir = baseDir; + this.appDir = new File(baseDir, "applications"); } public ServerType getServerType() { @@ -41,4 +41,8 @@ public File getBaseDir() { return baseDir; } + + public File getAppDir() { + return appDir; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java index 31e6e39..f0d7828 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java @@ -73,7 +73,7 @@ }); try { if (classLoader == null) { - /** crate a new classloader */ + /** create a new classloader */ URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]); classLoader = new MutableURLClassLoader(urls, this.getClass().getClassLoader()); } else { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java index a079d97..773ceaf 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java @@ -26,7 +26,10 @@ import java.io.OutputStream; import java.net.URL; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -86,8 +89,7 @@ * @param container * the container of serailizer/deserializer * @param ctx - * the ServerContext - * @param isNC + * the ServerContext * @param isNC * true is NC/false is CC * @throws HyracksException */ @@ -101,7 +103,13 @@ String rootDir = ctx.getBaseDir().toString(); String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId; - jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC)); + String[] fileSplits = urls.get(0).toString().split("\\."); + String extension = fileSplits[fileSplits.length - 1]; + if (urls.size() == 1 && "zip".equalsIgnoreCase(extension)) { + downloadURLs(urls, deploymentDir, isNC, true); + } else { + jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC, false)); + } } /** @@ -176,7 +184,8 @@ * @return a list of local file URLs * @throws HyracksException */ - private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException { + private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC, boolean isZip) + throws HyracksException { //retry 10 times at maximum for downloading binaries int retryCount = 10; int tried = 0; @@ -208,6 +217,9 @@ is.close(); } } + if (isZip) { + unzip(targetFile.getAbsolutePath(), deploymentDir); + } downloadedFileURLs.add(targetFile.toURI().toURL()); } return downloadedFileURLs; @@ -218,4 +230,21 @@ } throw HyracksException.create(trace); } + + public static void unzip(String sourceFile, String outputDir) throws IOException { + try (ZipFile zipFile = new ZipFile(sourceFile)) { + Enumeration<? extends ZipEntry> entries = zipFile.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + File entryDestination = new File(outputDir, entry.getName()); + if (!entry.isDirectory()) { + entryDestination.getParentFile().mkdirs(); + try (InputStream in = zipFile.getInputStream(entry); + OutputStream out = new FileOutputStream(entryDestination)) { + IOUtils.copy(in, out); + } + } + } + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 517169b..bf07c20 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -222,7 +222,7 @@ deployedJobSpecActivityClusterGraphMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, - new File(new File(NodeControllerService.class.getName()), id)); + new File(ioManager.getWorkspacePath(0), id)); getNodeControllerInfosAcceptor = new MutableObject<>(); memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR)); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index 14404d2..c335312 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -346,6 +346,10 @@ return dev.createFileRef(waPath + File.separator + waf.getName()); } + public String getWorkspacePath(int index) { + return workspaces.get(index) != null ? workspaces.get(index).getWorkspace() : null; + } + @Override public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException { try { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java index 29d12ce..9351348 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java @@ -214,21 +214,27 @@ } @Override - public DeploymentId deployBinary(List<String> jars) throws Exception { + public DeploymentId deployBinary(List<String> files) throws Exception { /** generate a deployment id */ DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString()); + deployBinary(deploymentId, files); + return deploymentId; + } + + @Override + public void deployBinary(DeploymentId deploymentId, List<String> files) throws Exception { List<URL> binaryURLs = new ArrayList<>(); - if (jars != null && !jars.isEmpty()) { + if (files != null && !files.isEmpty()) { CloseableHttpClient hc = new DefaultHttpClient(); try { - /** upload jars through a http client one-by-one to the CC server */ - for (String jar : jars) { - int slashIndex = jar.lastIndexOf('/'); - String fileName = jar.substring(slashIndex + 1); + /** upload files through a http client one-by-one to the CC server */ + for (String file : files) { + int slashIndex = file.lastIndexOf('/'); + String fileName = file.substring(slashIndex + 1); String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + deploymentId.toString() + "&" + fileName; HttpPut put = new HttpPut(url); - put.setEntity(new FileEntity(new File(jar), "application/octet-stream")); + put.setEntity(new FileEntity(new File(file), "application/octet-stream")); HttpResponse response = hc.execute(put); response.getEntity().consumeContent(); if (response.getStatusLine().getStatusCode() != 200) { @@ -244,7 +250,6 @@ } /** deploy the URLs to the CC and NCs */ hci.deployBinary(binaryURLs, deploymentId); - return deploymentId; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java index 4417795..63126ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadFactory; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.application.IServerContext; import org.apache.hyracks.api.application.IStateDumpHandler; import org.apache.hyracks.api.comm.IChannelInterfaceFactory; import org.apache.hyracks.api.config.IApplicationConfig; @@ -152,4 +153,9 @@ public Object getApplicationContext() { return appCtx; } + + @Override + public IServerContext getServerCtx() { + return null; + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3386 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I6be9fef54c010bdb32f5c78af9b973f9843f442f Gerrit-Change-Number: 3386 Gerrit-PatchSet: 1 Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]>
