Ian Maxon has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3386


Change subject: [WIP] Load UDFs via HTTP
......................................................................

[WIP] Load UDFs via HTTP

Change-Id: I6be9fef54c010bdb32f5c78af9b973f9843f442f
---
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
19 files changed, 400 insertions(+), 99 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/86/3386/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
new file mode 100644
index 0000000..9098c25
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.message.DeleteUdfMessage;
+import org.apache.asterix.app.message.LoadUdfMessage;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+
+public class UdfApiServlet extends AbstractServlet {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ICcApplicationContext appCtx;
+    private final ICCMessageBroker broker;
+
+    public UdfApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, 
Object> ctx, String... paths) {
+        super(ctx, paths);
+        this.appCtx = appCtx;
+        this.broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+    }
+
+    private String getResource(FullHttpRequest req, IServletResponse response) 
{
+        String[] path = new QueryStringDecoder(req.uri()).path().split("/");
+        if (path.length != 4) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return null;
+        }
+        String resourceName = path[path.length - 1];
+        return resourceName;
+    }
+
+    @Override
+    protected void post(IServletRequest request, IServletResponse response) {
+        FullHttpRequest req = request.getHttpRequest();
+        String resourceName = getResource(req, response);
+        if (resourceName == null) {
+            return;
+        }
+        try {
+            File udf = File.createTempFile(resourceName, ".zip");
+            RandomAccessFile raf = new RandomAccessFile(udf, "rw");
+            raf.setLength(req.content().readableBytes());
+            FileChannel fc = raf.getChannel();
+            ByteBuffer content = 
request.getHttpRequest().content().nioBuffer();
+            while (content.hasRemaining()) {
+                fc.write(content);
+            }
+            fc.force(true);
+            fc.close();
+            IHyracksClientConnection hcc = appCtx.getHcc();
+            DeploymentId udfName = new DeploymentId(resourceName);
+            ClassLoader cl = 
appCtx.getLibraryManager().getLibraryClassLoader("Default", resourceName);
+            if (cl != null) {
+                deleteUdf(resourceName);
+            }
+            hcc.deployBinary(udfName, Arrays.asList(udf.toString()));
+            
ExternalLibraryUtils.setUpExternaLibrary(appCtx.getLibraryManager(), false,
+                    
FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
+                            "applications", udfName.toString()));
+            LoadUdfMessage msg = new LoadUdfMessage(udfName.toString());
+            for (String nc : 
appCtx.getClusterStateManager().getParticipantNodes()) {
+                broker.sendApplicationMessageToNC(msg, nc);
+            }
+        } catch (Exception e) {
+            response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+            return;
+        }
+        response.setStatus(HttpResponseStatus.OK);
+
+    }
+
+    private void deleteUdf(String resourceName) throws Exception {
+        DeleteUdfMessage msg = new DeleteUdfMessage(resourceName);
+        for (String nc : 
appCtx.getClusterStateManager().getParticipantNodes()) {
+            broker.sendApplicationMessageToNC(msg, nc);
+        }
+        appCtx.getLibraryManager().deregisterLibraryClassLoader("Default", 
resourceName);
+        appCtx.getHcc().unDeployBinary(new DeploymentId(resourceName));
+    }
+
+    @Override
+    protected void delete(IServletRequest request, IServletResponse response) {
+        String resourceName = getResource(request.getHttpRequest(), response);
+        if (resourceName == null) {
+            return;
+        }
+        try {
+            deleteUdf(resourceName);
+        } catch (Exception e) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
+        response.setStatus(HttpResponseStatus.OK);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 0eac212..3beea8e 100755
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -60,8 +60,8 @@
     private ExternalLibraryUtils() {
     }

-    public static void setUpExternaLibraries(ILibraryManager 
externalLibraryManager, boolean isMetadataNode)
-            throws Exception {
+    public static void setUpExternaLibrary(ILibraryManager 
externalLibraryManager, boolean isMetadataNode,
+            String libraryPath) throws Exception {
         // start by un-installing removed libraries (Metadata Node only)
         Map<String, List<String>> uninstalledLibs = null;
         if (isMetadataNode) {
@@ -69,18 +69,26 @@
         }

         // get the directory of the to be installed libraries
-        File installLibDir = getLibraryInstallDir();
+        File installLibDir = new File(libraryPath);
         // directory exists?
         if (installLibDir.exists()) {
             // get the list of files in the directory
-            for (File dataverseDir : 
installLibDir.listFiles(File::isDirectory)) {
-                for (File libraryDir : 
dataverseDir.listFiles(File::isDirectory)) {
-                    // For each file (library), register classloader and 
configure its parameter.
-                    // If current node is Metadata Node, add the library to 
metadata.
-                    registerClassLoader(externalLibraryManager, 
dataverseDir.getName(), libraryDir.getName());
-                    configureLibrary(externalLibraryManager, 
dataverseDir.getName(), libraryDir, uninstalledLibs,
-                            isMetadataNode);
-                }
+            registerClassLoader(externalLibraryManager, "Default", 
installLibDir.getAbsolutePath());
+            configureLibrary(externalLibraryManager, "Default", installLibDir, 
uninstalledLibs, isMetadataNode);
+        }
+    }
+
+    public static void setUpInstalledLibraries(ILibraryManager 
externalLibraryManager, boolean isMetadataNode,
+            File appDir) throws Exception {
+        File[] libs = appDir.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return dir.isDirectory();
+            }
+        });
+        if (libs != null) {
+            for (File lib : libs) {
+                setUpExternaLibrary(externalLibraryManager, isMetadataNode, 
lib.getAbsolutePath());
             }
         }
     }
@@ -134,7 +142,7 @@
      * @throws RemoteException
      * @throws ACIDException
      */
-    protected static boolean uninstallLibrary(String dataverse, String 
libraryName)
+    public static boolean uninstallLibrary(String dataverse, String 
libraryName)
             throws AsterixException, RemoteException, ACIDException {
         MetadataTransactionContext mdTxnCtx = null;
         try {
@@ -303,15 +311,15 @@
      * register the library class loader with the external library manager
      *
      * @param dataverse
-     * @param libraryName
+     * @param libraryPath
      * @throws Exception
      */
     protected static void registerClassLoader(ILibraryManager 
externalLibraryManager, String dataverse,
-            String libraryName) throws Exception {
+            String libraryPath) throws Exception {
         // get the class loader
-        ClassLoader classLoader = getLibraryClassLoader(dataverse, 
libraryName);
+        ClassLoader classLoader = getLibraryClassLoader(dataverse, 
libraryPath);
         // register it with the external library manager
-        externalLibraryManager.registerLibraryClassLoader(dataverse, 
libraryName, classLoader);
+        externalLibraryManager.registerLibraryClassLoader(dataverse, new 
File(libraryPath).getName(), classLoader);
     }

     /**
@@ -331,22 +339,22 @@
     /**
      * Get the class loader for the library
      *
+     * @param libraryPath
      * @param dataverse
-     * @param libraryName
      * @return
      * @throws Exception
      */
-    private static ClassLoader getLibraryClassLoader(String dataverse, String 
libraryName) throws Exception {
+    private static ClassLoader getLibraryClassLoader(String dataverse, String 
libraryPath) throws Exception {
         // Get a reference to the library directory
-        File installDir = getLibraryInstallDir();
+        File installDir = new File(libraryPath);
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Installing lirbary " + libraryName + " in dataverse " 
+ dataverse + "."
-                    + " Install Directory: " + installDir.getAbsolutePath());
+            LOGGER.info("Installing lirbary " + dataverse + " in dataverse " + 
dataverse + "." + " Install Directory: "
+                    + installDir.getAbsolutePath());
         }

         // get a reference to the specific library dir
-        File libDir =
-                new File(installDir.getAbsolutePath() + File.separator + 
dataverse + File.separator + libraryName);
+        File libDir = installDir;
+
         FilenameFilter jarFileFilter = new FilenameFilter() {
             @Override
             public boolean accept(File dir, String name) {
@@ -388,7 +396,7 @@
         }

         if (LOGGER.isInfoEnabled()) {
-            StringBuilder logMesg = new StringBuilder("Classpath for library " 
+ libraryName + "\n");
+            StringBuilder logMesg = new StringBuilder("Classpath for library " 
+ dataverse + "\n");
             for (URL url : urls) {
                 logMesg.append(url.getFile() + "\n");
             }
@@ -397,14 +405,6 @@

         // create and return the class loader
         return new ExternalLibraryClassLoader(urls, parentClassLoader);
-    }
-
-    /**
-     * @return the directory "System.getProperty("app.home", 
System.getProperty("user.home")/lib/udfs"
-     */
-    protected static File getLibraryInstallDir() {
-        return new File(System.getProperty("app.home", 
System.getProperty("user.home")) + File.separator + "lib"
-                + File.separator + "udfs");
     }

     /**
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index 7246925..3cf2fed 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -18,22 +18,17 @@
  */
 package org.apache.asterix.app.external;

+import static 
org.apache.hyracks.control.common.deployment.DeploymentUtils.unzip;
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.rmi.RemoteException;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;

 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;

@@ -49,41 +44,13 @@
     }

     public static void removeLibraryDir() throws IOException {
-        File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
-        FileUtils.deleteQuietly(installLibDir);
-    }
-
-    public static void unzip(String sourceFile, String outputDir) throws 
IOException {
-        if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
-            try (ZipFile zipFile = new ZipFile(sourceFile)) {
-                Enumeration<? extends ZipEntry> entries = zipFile.entries();
-                while (entries.hasMoreElements()) {
-                    ZipEntry entry = entries.nextElement();
-                    File entryDestination = new File(outputDir, 
entry.getName());
-                    if (!entry.isDirectory()) {
-                        entryDestination.getParentFile().mkdirs();
-                        try (InputStream in = zipFile.getInputStream(entry);
-                                OutputStream out = new 
FileOutputStream(entryDestination)) {
-                            IOUtils.copy(in, out);
-                        }
-                    }
-                }
-            }
-        } else {
-            Process process = new ProcessBuilder("unzip", "-d", outputDir, 
sourceFile).start();
-            try {
-                process.waitFor();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException(e);
-            }
-        }
+        //stub
     }

     @Override
     public void install(String dvName, String libName, String libPath) throws 
Exception {
         // get the directory of the to be installed libraries
-        File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
+        File installLibDir = new File(libPath);
         // directory exists?
         if (!installLibDir.exists()) {
             installLibDir.mkdir();
@@ -124,7 +91,7 @@
             }
         }
         // get the directory of the to be installed libraries
-        File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
-        FileUtils.deleteQuietly(installLibDir);
+        //        File installLibDir = 
ExternalLibraryUtils.getLibraryInstallDir();
+        //        FileUtils.deleteQuietly(installLibDir);
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
new file mode 100644
index 0000000..64e0bb1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+
+public class DeleteUdfMessage implements INcAddressedMessage {
+
+    private final String deploymentId;
+
+    public DeleteUdfMessage(String deploymentId) {
+        this.deploymentId = deploymentId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        ILibraryManager mgr = appCtx.getLibraryManager();
+        String mdNodeName = 
appCtx.getMetadataProperties().getMetadataNodeName();
+        String nodeName = appCtx.getServiceContext().getNodeId();
+        boolean isMdNode = mdNodeName.equals(nodeName);
+        try {
+            if (isMdNode) {
+                ExternalLibraryUtils.uninstallLibrary("Default", deploymentId);
+            }
+            mgr.deregisterLibraryClassLoader("Default", deploymentId);
+        } catch (Exception e) {
+
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
new file mode 100644
index 0000000..cccf3af
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.util.file.FileUtil;
+
+public class LoadUdfMessage implements INcAddressedMessage {
+
+    private final String deploymentId;
+
+    public LoadUdfMessage(String deploymentId) {
+        this.deploymentId = deploymentId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        ILibraryManager mgr = appCtx.getLibraryManager();
+        String mdNodeName = 
appCtx.getMetadataProperties().getMetadataNodeName();
+        String nodeName = appCtx.getServiceContext().getNodeId();
+        boolean isMdNode = mdNodeName.equals(nodeName);
+        try {
+            ExternalLibraryUtils.setUpExternaLibrary(mgr, isMdNode,
+                    
FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
+                            "applications", deploymentId));
+        } catch (Exception e) {
+
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 8cfeb12..1ca2b78 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -38,7 +38,8 @@
     public void perform(CcId ccId, IControllerService cs) throws 
HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) 
cs.getApplicationContext();
         try {
-            
ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), 
metadataNode);
+            
ExternalLibraryUtils.setUpInstalledLibraries(appContext.getLibraryManager(), 
metadataNode,
+                    cs.getContext().getServerCtx().getAppDir());
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 99286a2..ed8a3d8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -47,6 +47,7 @@
 import org.apache.asterix.api.http.server.RebalanceApiServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.api.http.server.ShutdownApiServlet;
+import org.apache.asterix.api.http.server.UdfApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
@@ -148,7 +149,7 @@
         ReplicationProperties repProp =
                 new 
ReplicationProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
         INcLifecycleCoordinator lifecycleCoordinator = 
createNcLifeCycleCoordinator(repProp.isReplicationEnabled());
-        ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
+        ExternalLibraryUtils.setUpInstalledLibraries(libraryManager, false, 
ccServiceCtx.getServerCtx().getAppDir());
         componentProvider = new StorageComponentProvider();

         ccExtensionManager = new CCExtensionManager(new 
ArrayList<>(getExtensions()));
@@ -259,6 +260,7 @@
         addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must 
not precede add of CLUSTER_STATE
         addServlet(jsonAPIServer, Servlets.DIAGNOSTICS);
         addServlet(jsonAPIServer, Servlets.ACTIVE_STATS);
+        addServlet(jsonAPIServer, Servlets.UDF);
         return jsonAPIServer;
     }

@@ -311,6 +313,8 @@
                 return new DiagnosticsApiServlet(appCtx, ctx, paths);
             case Servlets.ACTIVE_STATS:
                 return new ActiveStatsApiServlet(appCtx, ctx, paths);
+            case Servlets.UDF:
+                return new UdfApiServlet(appCtx, ctx, paths);
             default:
                 throw new IllegalStateException(String.valueOf(key));
         }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index d5aa5d1..93a959f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -36,6 +36,7 @@
     public static final String ACTIVE_STATS = "/admin/active/*";
     public static final String STORAGE = "/admin/storage/*";
     public static final String NET_DIAGNOSTICS = "/admin/net/*";
+    public static final String UDF = "/admin/udf/*";

     private Servlets() {
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java
new file mode 100644
index 0000000..b997a6e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.application;
+
+import java.io.File;
+
+public interface IServerContext {
+    enum ServerType {
+        CLUSTER_CONTROLLER,
+        NODE_CONTROLLER,
+    }
+
+    ServerType getServerType();
+
+    File getBaseDir();
+
+    File getAppDir();
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
index 6effee3..0a99c45 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
@@ -69,4 +69,6 @@
     default IPersistedResourceRegistry getPersistedResourceRegistry() {
         throw new UnsupportedOperationException();
     }
+
+    IServerContext getServerCtx();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 89f2ad4..44018b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -159,12 +159,22 @@
     void waitForCompletion(JobId jobId) throws Exception;

     /**
-     * Deploy the user-defined jars to the cluster
+     * Deploy files to the cluster
      *
-     * @param jars
-     *            a list of user-defined jars
+     * @param files
+     *            a list of file paths
      */
-    DeploymentId deployBinary(List<String> jars) throws Exception;
+    DeploymentId deployBinary(List<String> files) throws Exception;
+
+    /**
+     * Deploy files to the cluster
+     *
+     * @param files
+     *            a list of file paths
+     * @param deploymentId
+     *            the id used to uniquely identify this set of files for 
management
+     */
+    void deployBinary(DeploymentId deploymentId, List<String> files) throws 
Exception;

     /**
      * undeploy a certain deployment
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
index 99fc76b..b6d32a9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
@@ -21,16 +21,16 @@
 import java.io.Serializable;
 import java.util.concurrent.ThreadFactory;

+import org.apache.hyracks.api.application.IServerContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
 import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;
 import org.apache.hyracks.api.messages.IMessageBroker;
-import org.apache.hyracks.control.common.context.ServerContext;

 public abstract class ServiceContext implements IServiceContext {
-    protected final ServerContext serverCtx;
+    protected final IServerContext serverCtx;
     protected final IApplicationConfig appConfig;
     protected ThreadFactory threadFactory;
     protected Serializable distributedState;
@@ -38,7 +38,7 @@
     protected IJobSerializerDeserializerContainer jobSerDeContainer = new 
JobSerializerDeserializerContainer();
     protected IPersistedResourceRegistry persistedResourceRegistry;

-    public ServiceContext(ServerContext serverCtx, IApplicationConfig 
appConfig, ThreadFactory threadFactory) {
+    public ServiceContext(IServerContext serverCtx, IApplicationConfig 
appConfig, ThreadFactory threadFactory) {
         this.serverCtx = serverCtx;
         this.appConfig = appConfig;
         this.threadFactory = threadFactory;
@@ -88,4 +88,9 @@
     public IPersistedResourceRegistry getPersistedResourceRegistry() {
         return persistedResourceRegistry;
     }
+
+    @Override
+    public IServerContext getServerCtx() {
+        return serverCtx;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
index ff037f0..ef2777d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
@@ -20,18 +20,18 @@

 import java.io.File;

-public class ServerContext {
-    public enum ServerType {
-        CLUSTER_CONTROLLER,
-        NODE_CONTROLLER,
-    }
+import org.apache.hyracks.api.application.IServerContext;
+
+public class ServerContext implements IServerContext {

     private final ServerType type;
     private final File baseDir;
+    private final File appDir;

     public ServerContext(ServerType type, File baseDir) {
         this.type = type;
         this.baseDir = baseDir;
+        this.appDir = new File(baseDir, "applications");
     }

     public ServerType getServerType() {
@@ -41,4 +41,8 @@
     public File getBaseDir() {
         return baseDir;
     }
+
+    public File getAppDir() {
+        return appDir;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 31e6e39..f0d7828 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -73,7 +73,7 @@
         });
         try {
             if (classLoader == null) {
-                /** crate a new classloader */
+                /** create a new classloader */
                 URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
                 classLoader = new MutableURLClassLoader(urls, 
this.getClass().getClassLoader());
             } else {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
index a079d97..773ceaf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
@@ -26,7 +26,10 @@
 import java.io.OutputStream;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;

 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -86,8 +89,7 @@
      * @param container
      *            the container of serailizer/deserializer
      * @param ctx
-     *            the ServerContext
-     * @param isNC
+     *            the ServerContext * @param isNC
      *            true is NC/false is CC
      * @throws HyracksException
      */
@@ -101,7 +103,13 @@
         String rootDir = ctx.getBaseDir().toString();
         String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + 
DEPLOYMENT + File.separator + deploymentId
                 : rootDir + File.separator + DEPLOYMENT + File.separator + 
deploymentId;
-        jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC));
+        String[] fileSplits = urls.get(0).toString().split("\\.");
+        String extension = fileSplits[fileSplits.length - 1];
+        if (urls.size() == 1 && "zip".equalsIgnoreCase(extension)) {
+            downloadURLs(urls, deploymentDir, isNC, true);
+        } else {
+            jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC, 
false));
+        }
     }

     /**
@@ -176,7 +184,8 @@
      * @return a list of local file URLs
      * @throws HyracksException
      */
-    private static List<URL> downloadURLs(List<URL> urls, String 
deploymentDir, boolean isNC) throws HyracksException {
+    private static List<URL> downloadURLs(List<URL> urls, String 
deploymentDir, boolean isNC, boolean isZip)
+            throws HyracksException {
         //retry 10 times at maximum for downloading binaries
         int retryCount = 10;
         int tried = 0;
@@ -208,6 +217,9 @@
                             is.close();
                         }
                     }
+                    if (isZip) {
+                        unzip(targetFile.getAbsolutePath(), deploymentDir);
+                    }
                     downloadedFileURLs.add(targetFile.toURI().toURL());
                 }
                 return downloadedFileURLs;
@@ -218,4 +230,21 @@
         }
         throw HyracksException.create(trace);
     }
+
+    public static void unzip(String sourceFile, String outputDir) throws 
IOException {
+        try (ZipFile zipFile = new ZipFile(sourceFile)) {
+            Enumeration<? extends ZipEntry> entries = zipFile.entries();
+            while (entries.hasMoreElements()) {
+                ZipEntry entry = entries.nextElement();
+                File entryDestination = new File(outputDir, entry.getName());
+                if (!entry.isDirectory()) {
+                    entryDestination.getParentFile().mkdirs();
+                    try (InputStream in = zipFile.getInputStream(entry);
+                            OutputStream out = new 
FileOutputStream(entryDestination)) {
+                        IOUtils.copy(in, out);
+                    }
+                }
+            }
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 517169b..bf07c20 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -222,7 +222,7 @@
             deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
             timer = new Timer(true);
             serverCtx = new 
ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
-                    new File(new File(NodeControllerService.class.getName()), 
id));
+                    new File(ioManager.getWorkspacePath(0), id));
             getNodeControllerInfosAcceptor = new MutableObject<>();
             memoryManager =
                     new MemoryManager((long) 
(memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 14404d2..c335312 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -346,6 +346,10 @@
         return dev.createFileRef(waPath + File.separator + waf.getName());
     }

+    public String getWorkspacePath(int index) {
+        return workspaces.get(index) != null ? 
workspaces.get(index).getWorkspace() : null;
+    }
+
     @Override
     public void sync(IFileHandle fileHandle, boolean metadata) throws 
HyracksDataException {
         try {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index 29d12ce..9351348 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -214,21 +214,27 @@
     }

     @Override
-    public DeploymentId deployBinary(List<String> jars) throws Exception {
+    public DeploymentId deployBinary(List<String> files) throws Exception {
         /** generate a deployment id */
         DeploymentId deploymentId = new 
DeploymentId(UUID.randomUUID().toString());
+        deployBinary(deploymentId, files);
+        return deploymentId;
+    }
+
+    @Override
+    public void deployBinary(DeploymentId deploymentId, List<String> files) 
throws Exception {
         List<URL> binaryURLs = new ArrayList<>();
-        if (jars != null && !jars.isEmpty()) {
+        if (files != null && !files.isEmpty()) {
             CloseableHttpClient hc = new DefaultHttpClient();
             try {
-                /** upload jars through a http client one-by-one to the CC 
server */
-                for (String jar : jars) {
-                    int slashIndex = jar.lastIndexOf('/');
-                    String fileName = jar.substring(slashIndex + 1);
+                /** upload files through a http client one-by-one to the CC 
server */
+                for (String file : files) {
+                    int slashIndex = file.lastIndexOf('/');
+                    String fileName = file.substring(slashIndex + 1);
                     String url = "http://"; + ccHost + ":" + 
ccInfo.getWebPort() + "/applications/"
                             + deploymentId.toString() + "&" + fileName;
                     HttpPut put = new HttpPut(url);
-                    put.setEntity(new FileEntity(new File(jar), 
"application/octet-stream"));
+                    put.setEntity(new FileEntity(new File(file), 
"application/octet-stream"));
                     HttpResponse response = hc.execute(put);
                     response.getEntity().consumeContent();
                     if (response.getStatusLine().getStatusCode() != 200) {
@@ -244,7 +250,6 @@
         }
         /** deploy the URLs to the CC and NCs */
         hci.deployBinary(binaryURLs, deploymentId);
-        return deploymentId;
     }

     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
index 4417795..63126ab 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.ThreadFactory;

 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.application.IServerContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
 import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -152,4 +153,9 @@
     public Object getApplicationContext() {
         return appCtx;
     }
+
+    @Override
+    public IServerContext getServerCtx() {
+        return null;
+    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/3386
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6be9fef54c010bdb32f5c78af9b973f9843f442f
Gerrit-Change-Number: 3386
Gerrit-PatchSet: 1
Gerrit-Owner: Ian Maxon <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>

Reply via email to