>From Ian Maxon <[email protected]>: Ian Maxon has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18774 )
Change subject: [ASTERIXDB-3495] Library Cloud Storage ...................................................................... [ASTERIXDB-3495] Library Cloud Storage Details: - Use the cloud IO manager to persist library archives - Unzip them onto local storage as part of library deployment Change-Id: I2c173fbdc985ea590da5315c1ce18e7610fb6af0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18774 Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java 12 files changed, 270 insertions(+), 60 deletions(-) Approvals: Ian Maxon: Looks good to me, but someone else must approve Peeyush Gupta: Looks good to me, approved Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java index 74beb46..76d91cf 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java @@ -20,6 +20,8 @@ import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH; +import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME; +import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor; import java.io.FileOutputStream; import java.io.IOException; @@ -28,6 +30,8 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.net.URI; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +54,7 @@ import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.functions.ExternalFunctionLanguage; import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.library.LibraryDescriptor; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; @@ -60,6 +65,9 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.utils.HttpUtil; @@ -223,6 +231,47 @@ } } + private void writeLibToCloud(LibraryUploadData uploadData, Namespace libNamespace, String libName, + MessageDigest digest, ExternalFunctionLanguage language) throws IOException { + FileReference libDir = libraryManager.getLibraryDir(libNamespace, libName); + IIOManager cloudIoMgr = libraryManager.getCloudIOManager(); + FileReference lib = libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME); + if (!libDir.getFile().exists()) { + Files.createDirectories(lib.getFile().toPath().getParent()); + } + Files.createFile(lib.getFile().toPath()); + IFileHandle fh = cloudIoMgr.open(lib, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh); + byte[] writeBuf = new byte[4096]; + FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME); + try (OutputStream outputStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest); + InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) { + IOUtils.copyLarge(ui, outputStream, writeBuf); + outputStream.flush(); + cloudIoMgr.sync(fh, true); + writeDescriptor(libraryManager, targetDescFile, + new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf); + } finally { + cloudIoMgr.close(fh); + } + } + + private URI cacheLibAndDistribute(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt, + MessageDigest digest) throws Exception { + Path libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libDv.getCanonicalForm() + "." + + libName); + } + FileOutputStream libTmpOut = new FileOutputStream(libraryTempFile.toFile()); + try (OutputStream os = new DigestOutputStream(libTmpOut, digest); + InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) { + IOUtils.copyLarge(ui, os); + } + return createDownloadURI(libraryTempFile); + } + private void handleModification(IServletRequest request, IServletResponse response, LibraryOperation op) { HttpRequest httpRequest = request.getHttpRequest(); Path libraryTempFile = null; @@ -240,20 +289,16 @@ LibraryUploadData uploadData = decodeMultiPartLibraryOptions(requestDecoder); ExternalFunctionLanguage language = uploadData.type; String fileExt = FilenameUtils.getExtension(uploadData.fileUpload.getFilename()); - libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Created temporary file " + libraryTempFile + " for library " - + libDv.getCanonicalForm() + "." + libName); - } MessageDigest digest = MessageDigest.getInstance("MD5"); - libTmpOut = new FileOutputStream(libraryTempFile.toFile()); - try (OutputStream os = new DigestOutputStream(libTmpOut, digest); - InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) { - IOUtils.copyLarge(ui, os); + if (appCtx.isCloudDeployment()) { + writeLibToCloud(uploadData, libNamespace, libName, digest, language); + doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), null, + true, getSysAuthHeader(), requestReference, request); + } else { + URI downloadURI = cacheLibAndDistribute(uploadData, libDv, libName, fileExt, digest); + doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), + downloadURI, true, getSysAuthHeader(), requestReference, request); } - URI downloadURI = createDownloadURI(libraryTempFile); - doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), downloadURI, - true, getSysAuthHeader(), requestReference, request); } else if (op == LibraryOperation.DELETE) { //DELETE semantics imply ifExists doDrop(libNamespace, libName, false, requestReference, request); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 343baf0..1f08a5f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -335,8 +335,8 @@ NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService(); FileReference appDir = - ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath()); - libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, ioManager); + persistenceIOManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getPath()); + libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, persistenceIOManager); libraryManager.initialize(resetStorageData); /* diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index e1b4bb0..f7a6dc1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -3753,7 +3753,9 @@ // #. create library artifacts in NCs. runJob(hcc, prepareJobSpec, jobFlags); prepareJobSuccessful = true; - runJob(hcc, commitJobSpec, jobFlags); + if (!appCtx.isCloudDeployment()) { + runJob(hcc, commitJobSpec, jobFlags); + } // #. begin new metadataTxn mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 912ca47..e4224f2 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.cloud; +import static org.apache.asterix.common.utils.StorageConstants.APPLICATION_ROOT_DIR_NAME; import static org.apache.asterix.common.utils.StorageConstants.METADATA_PARTITION; import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX; import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME; @@ -28,6 +29,8 @@ import java.nio.ByteBuffer; import java.nio.file.FileStore; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -40,6 +43,7 @@ import org.apache.asterix.cloud.clients.ICloudClient; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.ICloudWriter; +import org.apache.asterix.cloud.clients.IParallelDownloader; import org.apache.asterix.cloud.util.CloudFileUtil; import org.apache.asterix.common.api.INamespacePathResolver; import org.apache.asterix.common.cloud.IPartitionBootstrapper; @@ -51,6 +55,7 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IFileHandle; import org.apache.hyracks.api.io.IIOBulkOperation; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.cloud.filesystem.PhysicalDrive; @@ -146,6 +151,7 @@ // Has different implementations depending on the caching policy downloadPartitions(metadataNode, metadataPartition); + downloadAllLibraries(); } private void deleteUnkeptPartitionDirs(List<FileReference> currentOnDiskPartitions) throws HyracksDataException { @@ -181,6 +187,23 @@ protected abstract Set<UncachedFileReference> getUncachedFiles(); + @Override + public void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException { + IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager); + LOGGER.info("Downloading all files located in {}", libPath); + downloader.downloadDirectories(libPath); + LOGGER.info("Finished downloading {}", libPath); + } + + public void downloadAllLibraries() throws HyracksDataException { + IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager); + FileReference appDir = resolveAbsolutePath( + localIoManager.getWorkspacePath(0).getPath() + File.separator + APPLICATION_ROOT_DIR_NAME); + LOGGER.info("Downloading all libraries in + {}", appDir); + downloader.downloadDirectories(Collections.singletonList(appDir)); + LOGGER.info("Finished downloading all libraries"); + } + /* * ****************************************************************** * ICloudIOManager functions @@ -495,4 +518,9 @@ public long getTotalDiskUsage() { return PhysicalDrive.getUsedSpace(drivePaths); } + + @Override + public IIOManager getLocalIOManager() { + return localIoManager; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java index 2098060..db87c60 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java @@ -33,11 +33,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.ipc.impl.IPCSystem; public interface ILibraryManager { + String LIBRARY_ARCHIVE_NAME = "library_archive.zip"; + List<Pair<Namespace, String>> getLibraryListing() throws IOException; String getLibraryHash(Namespace namespace, String libraryName) throws IOException; @@ -70,7 +73,12 @@ void unzip(FileReference sourceFile, FileReference outputDir) throws IOException; - void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuf) throws IOException; + void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer, IIOManager localIoManager) + throws IOException; void setUploadClient(Function<ILibraryManager, CloseableHttpClient> f); + + void writeShim(FileReference outputFile, byte[] copyBuf) throws IOException; + + IIOManager getCloudIOManager(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index bd67b11..627f170 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.utils; +import static org.apache.hyracks.control.common.context.ServerContext.APP_DIR_NAME; + import java.util.LinkedHashMap; import java.util.Map; @@ -33,6 +35,7 @@ public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs"; public static final String GLOBAL_TXN_DIR_NAME = "."; public static final String STORAGE_ROOT_DIR_NAME = "storage"; + public static final String APPLICATION_ROOT_DIR_NAME = APP_DIR_NAME; public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs"; public static final String PARTITION_DIR_PREFIX = "partition_"; /** diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 300e723..de236ba 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -599,6 +599,11 @@ <artifactId>avro-mapred</artifactId> <version>1.12.0</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-cloud</artifactId> + <version>0.3.10-SNAPSHOT</version> + </dependency> </dependencies> <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 --> <repositories> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java index 47a1768..f9dc118 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java @@ -20,7 +20,10 @@ import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY; import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.ENTRYPOINT; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -93,6 +96,7 @@ import org.apache.hyracks.api.network.INetworkSecurityConfig; import org.apache.hyracks.api.network.INetworkSecurityManager; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.cloud.io.ICloudIOManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.ipc.impl.IPCSystem; @@ -123,7 +127,7 @@ public static final String CONTENTS_DIR_NAME = "contents"; - public static final String DESCRIPTOR_FILE_NAME = "lib.json"; + public static final String DESCRIPTOR_FILE_NAME = "desc.json"; public static final String DISTRIBUTION_DIR = "dist"; @@ -140,13 +144,13 @@ private final FileReference trashDir; private final FileReference distDir; private final Path trashDirPath; - //TODO(DB): change for database private final Map<Pair<Namespace, String>, ILibrary> libraries = new HashMap<>(); private IPCSystem pythonIPC; private final ExternalFunctionResultRouter router; private final IIOManager ioManager; private final INamespacePathResolver namespacePathResolver; private final boolean sslEnabled; + private final boolean cloudMode; private Function<ILibraryManager, CloseableHttpClient> uploadClientSupp; public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir, @@ -165,6 +169,7 @@ this.sslEnabled = ncs.getConfiguration().isSslEnabled(); this.ioManager = ioManager; uploadClientSupp = ExternalLibraryManager::defaultHttpClient; + cloudMode = ncs.getConfiguration().isCloudDeployment(); } public void initialize(boolean resetStorageData) throws HyracksDataException { @@ -216,6 +221,13 @@ @Override public void start() { + if (cloudMode) { + try { + unzipAllLibs(baseDir); + } catch (IOException e) { + LOGGER.error("Failed to unzip all libraries", e); + } + } } @Override @@ -470,17 +482,47 @@ return outZip; } + private void unzipAllLibs(FileReference libDir) throws IOException { + byte[] copyBuf = new byte[4096]; + Files.walkFileTree(libDir.getFile().toPath(), new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path currPath, BasicFileAttributes attrs) throws IOException { + if (currPath.getFileName().toString().equals(LIBRARY_ARCHIVE_NAME)) { + FileReference lib = ioManager.resolveAbsolutePath(currPath.toString()); + FileReference content = lib.getParent().getChild(REV_1_DIR_NAME).getChild(CONTENTS_DIR_NAME); + if (!content.getFile().exists()) { + FileUtils.forceMkdir(content.getFile()); + } + unzip(lib, content); + writeShim(content.getChild(ENTRYPOINT), copyBuf); + } else if (currPath.getFileName().toString().equals(DESCRIPTOR_FILE_NAME)) { + Path revDir = currPath.resolveSibling(REV_1_DIR_NAME); + if (!revDir.toFile().exists()) { + FileUtils.forceMkdir(revDir.toFile()); + } + Files.copy(currPath, currPath.resolveSibling(REV_1_DIR_NAME).resolve(DESCRIPTOR_FILE_NAME), + REPLACE_EXISTING); + } + return FileVisitResult.CONTINUE; + } + }); + } + @Override public void dropLibraryPath(FileReference fileRef) throws HyracksDataException { - // does not flush any directories try { Path path = fileRef.getFile().toPath(); - Path trashPath = Files.createTempDirectory(trashDirPath, null); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Drop (move) {} into {}", path, trashPath); + if (ncs.getConfiguration().isCloudDeployment()) { + ioManager.delete(fileRef.getChild(LIBRARY_ARCHIVE_NAME)); + ioManager.delete(fileRef.getChild(DESCRIPTOR_FILE_NAME)); + } else { + Path trashPath = Files.createTempDirectory(trashDirPath, null); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Drop (move) {} into {}", path, trashPath); + } + Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE); + ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath)); } - Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE); - ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath)); } catch (IOException e) { throw HyracksDataException.create(e); } @@ -614,6 +656,10 @@ @Override public void unzip(FileReference sourceFile, FileReference outputDir) throws IOException { boolean logTraceEnabled = LOGGER.isTraceEnabled(); + IIOManager localIoManager = ioManager; + if (ncs.getConfiguration().isCloudDeployment()) { + localIoManager = ((ICloudIOManager) ioManager).getLocalIOManager(); + } Set<Path> newDirs = new HashSet<>(); Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize(); try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) { @@ -635,11 +681,11 @@ newDirs.add(p); } try (InputStream in = zipFile.getInputStream(entry)) { - FileReference entryOutputFileRef = ioManager.resolveAbsolutePath(entryOutputPath.toString()); + FileReference entryOutputFileRef = localIoManager.resolveAbsolutePath(entryOutputPath.toString()); if (logTraceEnabled) { LOGGER.trace("Extracting file {}", entryOutputFileRef); } - writeAndForce(entryOutputFileRef, in, writeBuf); + writeAndForce(entryOutputFileRef, in, writeBuf, localIoManager); } } } @@ -649,17 +695,18 @@ } @Override - public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer) throws IOException { + public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer, + IIOManager localIoManager) throws IOException { outputFile.getFile().createNewFile(); - IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE, + IFileHandle fHandle = localIoManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle); + WritableByteChannel outChannel = localIoManager.newWritableChannel(fHandle); try (OutputStream outputStream = Channels.newOutputStream(outChannel)) { IOUtils.copyLarge(dataStream, outputStream, copyBuffer); outputStream.flush(); - ioManager.sync(fHandle, true); + localIoManager.sync(fHandle, true); } finally { - ioManager.close(fHandle); + localIoManager.close(fHandle); } } @@ -696,4 +743,33 @@ } } + @Override + public void writeShim(FileReference outputFile, byte[] copyBuf) throws IOException { + InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName()); + if (is == null) { + throw new IOException("Classpath does not contain necessary Python resources!"); + } + try { + if (ncs.getConfiguration().isCloudDeployment()) { + writeAndForce(outputFile, is, copyBuf, ((ICloudIOManager) ioManager).getLocalIOManager()); + } else { + writeAndForce(outputFile, is, copyBuf, ioManager); + } + } finally { + is.close(); + } + } + + @Override + public IIOManager getCloudIOManager() { + return ioManager; + } + + public static void writeDescriptor(ILibraryManager libraryManager, FileReference descFile, LibraryDescriptor desc, + boolean cloud, byte[] copyBuf) throws IOException { + byte[] bytes = libraryManager.serializeLibraryDescriptor(desc); + libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf, + libraryManager.getCloudIOManager()); + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java index b254836..e9c7d72 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.cloud.io.ICloudIOManager; import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; @@ -61,10 +62,14 @@ protected IIOManager ioManager; + protected ICloudIOManager cloudIoManager; + protected ILibraryManager libraryManager; private FileReference libraryDir; + protected boolean cloudMode = false; + protected AbstractLibraryNodePushable(IHyracksTaskContext ctx) { this.ctx = ctx; } @@ -75,9 +80,13 @@ public final void initialize() throws HyracksDataException { INcApplicationContext runtimeCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); - ioManager = runtimeCtx.getIoManager(); + ioManager = runtimeCtx.getPersistenceIoManager(); libraryManager = runtimeCtx.getLibraryManager(); libraryDir = libraryManager.getLibraryDir(namespace, libraryName); + if (runtimeCtx.isCloudDeployment()) { + cloudMode = true; + cloudIoManager = (ICloudIOManager) ioManager; + } try { execute(); } catch (IOException e) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java index 0c12d43..638444d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java @@ -19,22 +19,26 @@ package org.apache.asterix.external.operators; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.asterix.common.library.ILibraryManager.LIBRARY_ARCHIVE_NAME; +import static org.apache.asterix.external.library.ExternalLibraryManager.CONTENTS_DIR_NAME; import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME; +import static org.apache.asterix.external.library.ExternalLibraryManager.REV_1_DIR_NAME; +import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor; +import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.ENTRYPOINT; import static org.apache.hyracks.control.common.controllers.NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.security.MessageDigest; +import java.util.Collections; import org.apache.asterix.common.functions.ExternalFunctionLanguage; import org.apache.asterix.common.library.LibraryDescriptor; import org.apache.asterix.common.metadata.Namespace; -import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.external.util.ExternalLibraryUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -48,7 +52,7 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOperatorDescriptor { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class); @@ -71,13 +75,30 @@ private final byte[] copyBuf = new byte[4096]; + private void cloudDeploy() throws IOException { + FileReference libDir = getLibraryDir(); + libDir = new FileReference(libDir.getDeviceHandle(), FilenameUtils.normalize(libDir.getRelativePath())); + cloudIoManager.downloadLibrary(Collections.singletonList(libDir)); + FileReference content = libDir.getChild(REV_1_DIR_NAME).getChild(CONTENTS_DIR_NAME); + libraryManager.unzip(libDir.getChild(LIBRARY_ARCHIVE_NAME), content); + libraryManager.writeShim(content.getChild(ENTRYPOINT), copyBuf); + Files.copy(libDir.getChild(DESCRIPTOR_FILE_NAME).getFile().toPath(), + content.getParent().getChild(DESCRIPTOR_FILE_NAME).getFile().toPath(), REPLACE_EXISTING); + } + @Override protected void execute() throws IOException { if (LOGGER.isInfoEnabled()) { LOGGER.info("Prepare deployment of library {}.{}", namespace, libraryName); } - // #. create library dir if necessary, clean 'stage' dir + if (libLocation == null && cloudMode) { + cloudDeploy(); + return; + } + + //#. create library dir if necessary, clean 'stage' dir + FileReference libDir = getLibraryDir(); Path libDirPath = libDir.getFile().toPath(); @@ -121,7 +142,7 @@ } MessageDigest digest = libraryManager.download(targetFile, authToken, libLocation); // extract from the archive - FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME); + FileReference contentsDir = stageDir.getChild(CONTENTS_DIR_NAME); mkdir(contentsDir); if (LOGGER.isDebugEnabled()) { @@ -147,8 +168,9 @@ if (LOGGER.isTraceEnabled()) { LOGGER.trace("Writing library descriptor into {}", targetDescFile); } - writeDescriptor(targetDescFile, - new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest))); + writeDescriptor(libraryManager, targetDescFile, + new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), false, + copyBuf); flushDirectory(contentsDir); flushDirectory(stageDir); @@ -158,7 +180,7 @@ boolean writeMsgpack) throws IOException { FileReference msgpack = stageDir.getChild("msgpack.pyz"); if (writeMsgpack) { - writeShim(msgpack); + libraryManager.writeShim(msgpack, copyBuf); File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc"); FileReference msgPackFolderRef = new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath()); @@ -166,24 +188,7 @@ Files.delete(msgpack.getFile().toPath()); } libraryManager.unzip(sourceFile, contentsDir); - writeShim(contentsDir.getChild("entrypoint.py")); - } - - private void writeShim(FileReference outputFile) throws IOException { - InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName()); - if (is == null) { - throw new IOException("Classpath does not contain necessary Python resources!"); - } - try { - libraryManager.writeAndForce(outputFile, is, copyBuf); - } finally { - is.close(); - } - } - - private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException { - byte[] bytes = libraryManager.serializeLibraryDescriptor(desc); - libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf); + libraryManager.writeShim(contentsDir.getChild(ENTRYPOINT), copyBuf); } }; diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java index 9b6a2ff..62e0829 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java @@ -19,9 +19,12 @@ package org.apache.hyracks.cloud.io; import java.nio.ByteBuffer; +import java.util.Collection; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest; import org.apache.hyracks.cloud.io.request.ICloudRequest; import org.apache.hyracks.cloud.io.stream.CloudInputStream; @@ -32,6 +35,8 @@ * file operations in a cloud deployment. */ public interface ICloudIOManager { + void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException; + /** * Read from the cloud * @@ -105,4 +110,6 @@ * @param resourcePath to evict */ void evict(String resourcePath) throws HyracksDataException; + + IIOManager getLocalIOManager(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index 960f23b..37e1477 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -382,7 +382,7 @@ } private File getWorkspaceFolder(IODeviceHandle dev) { - return new File(dev.getMount(), dev.getWorkspace()); + return Path.of(dev.getMount().getPath(), dev.getWorkspace()).normalize().toFile(); } @Override @@ -491,8 +491,12 @@ @Override public int write(ByteBuffer src) throws IOException { + int origPos = src.position(); int written = IOManager.this.syncWrite(fHandle, position, src); position += written; + if (src.position() < origPos + written) { + src.position(origPos + written); + } return written; } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18774 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I2c173fbdc985ea590da5315c1ce18e7610fb6af0 Gerrit-Change-Number: 18774 Gerrit-PatchSet: 18 Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]> Gerrit-MessageType: merged
