Stage the pipeline without using a temp file
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/090c5124 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/090c5124 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/090c5124 Branch: refs/heads/master Commit: 090c512457e25c965efab2d6c849f1a50e03e052 Parents: aea0c60 Author: Kenneth Knowles <k...@apache.org> Authored: Tue Oct 17 16:06:05 2017 -0700 Committer: Kenneth Knowles <k...@apache.org> Committed: Wed Oct 18 13:02:25 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 22 +--- .../beam/runners/dataflow/util/GcsStager.java | 29 +++-- .../beam/runners/dataflow/util/PackageUtil.java | 116 ++++++++++++++----- .../beam/runners/dataflow/util/Stager.java | 5 + 4 files changed, 111 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 6dbc4af..ecef072 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -41,7 +41,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.net.URISyntaxException; @@ -191,10 +190,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024; @VisibleForTesting - static final String PIPELINE_FILE_NAME = "pipeline"; - - @VisibleForTesting - static final String SERIALIZED_PROTOBUF_EXTENSION = ".pb"; + static final String PIPELINE_FILE_NAME = "pipeline.pb"; private static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url"; @@ -526,22 +522,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { List<DataflowPackage> packages = options.getStager().stageDefaultFiles(); - RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline); - File serializedProtoPipeline; - try { - serializedProtoPipeline = - File.createTempFile(PIPELINE_FILE_NAME, SERIALIZED_PROTOBUF_EXTENSION); - protoPipeline.writeDelimitedTo(new FileOutputStream(serializedProtoPipeline)); - } catch (IOException e) { - throw new RuntimeException(e); - } - + byte[] serializedProtoPipeline = PipelineTranslation.toProto(pipeline).toByteArray(); LOG.info("Staging pipeline description to {}", options.getStagingLocation()); DataflowPackage stagedPipeline = - options - .getStager() - .stageFiles(ImmutableList.of(serializedProtoPipeline.getAbsolutePath())) - .get(0); + options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME); // Set a unique client_request_id in the CreateJob request. // This is used to ensure idempotence of job creation across retried http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index ff205f0..7ed78e8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -72,19 +72,28 @@ public class GcsStager implements Stager { */ @Override public List<DataflowPackage> stageFiles(List<String> filesToStage) { + try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) { + return packageUtil.stageClasspathElements( + filesToStage, options.getStagingLocation(), buildCreateOptions()); + } + } + + @Override + public DataflowPackage stageToFile(byte[] bytes, String baseName) { + try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) { + return packageUtil.stageToFile( + bytes, baseName, options.getStagingLocation(), buildCreateOptions()); + } + } + + private GcsCreateOptions buildCreateOptions() { int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024); checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0"); uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024); - GcsCreateOptions createOptions = - GcsCreateOptions.builder() - .setGcsUploadBufferSizeBytes(uploadSizeBytes) - .setMimeType(MimeTypes.BINARY) - .build(); - - try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) { - return packageUtil.stageClasspathElements( - filesToStage, options.getStagingLocation(), createOptions); - } + return GcsCreateOptions.builder() + .setGcsUploadBufferSizeBytes(uploadSizeBytes) + .setMimeType(MimeTypes.BINARY) + .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 449b36d..565e965 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.util; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import com.fasterxml.jackson.core.Base64Variants; import com.google.api.client.util.BackOff; @@ -29,6 +30,7 @@ import com.google.common.base.Function; import com.google.common.hash.Funnels; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; +import com.google.common.io.ByteSource; import com.google.common.io.CountingOutputStream; import com.google.common.io.Files; import com.google.common.util.concurrent.AsyncFunction; @@ -51,6 +53,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions; import org.apache.beam.sdk.io.FileSystems; @@ -182,11 +185,11 @@ class PackageUtil implements Closeable { private StagingResult stagePackageSynchronously( PackageAttributes attributes, Sleeper retrySleeper, CreateOptions createOptions) throws IOException, InterruptedException { - File source = attributes.getSource(); + String sourceDescription = attributes.getSourceDescription(); String target = attributes.getDestination().getLocation(); if (alreadyStaged(attributes)) { - LOG.debug("Skipping file already staged: {} at {}", source, target); + LOG.debug("Skipping file already staged: {} at {}", sourceDescription, target); return StagingResult.cached(attributes); } @@ -194,14 +197,14 @@ class PackageUtil implements Closeable { return tryStagePackageWithRetry(attributes, retrySleeper, createOptions); } catch (Exception miscException) { throw new RuntimeException( - String.format("Could not stage %s to %s", source, target), miscException); + String.format("Could not stage %s to %s", sourceDescription, target), miscException); } } private StagingResult tryStagePackageWithRetry( PackageAttributes attributes, Sleeper retrySleeper, CreateOptions createOptions) throws IOException, InterruptedException { - File source = attributes.getSource(); + String sourceDescription = attributes.getSourceDescription(); String target = attributes.getDestination().getLocation(); BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); @@ -217,19 +220,22 @@ class PackageUtil implements Closeable { + "of %s. Please verify credentials are valid and that you have " + "write access to %s. Stale credentials can be resolved by executing " + "'gcloud auth application-default login'.", - source, target); + sourceDescription, target); LOG.error(errorMessage); throw new IOException(errorMessage, ioException); } long sleep = backoff.nextBackOffMillis(); if (sleep == BackOff.STOP) { - LOG.error("Upload failed, will NOT retry staging of package: {}", source, ioException); + LOG.error( + "Upload failed, will NOT retry staging of package: {}", + sourceDescription, + ioException); throw new RuntimeException("Could not stage %s to %s", ioException); } else { LOG.warn( "Upload attempt failed, sleeping before retrying staging of package: {}", - source, + sourceDescription, ioException); retrySleeper.sleep(sleep); } @@ -237,16 +243,29 @@ class PackageUtil implements Closeable { } } - private StagingResult tryStagePackage( - PackageAttributes attributes, CreateOptions createOptions) + private StagingResult tryStagePackage(PackageAttributes attributes, CreateOptions createOptions) throws IOException, InterruptedException { - File source = attributes.getSource(); + String sourceDescription = attributes.getSourceDescription(); String target = attributes.getDestination().getLocation(); - LOG.info("Uploading {} to {}", source, target); + LOG.info("Uploading {} to {}", sourceDescription, target); try (WritableByteChannel writer = FileSystems.create(FileSystems.matchNewResource(target, false), createOptions)) { - copyContent(attributes.getSource(), writer); + if (attributes.getBytes() != null) { + ByteSource.wrap(attributes.getBytes()).copyTo(Channels.newOutputStream(writer)); + } else { + File sourceFile = attributes.getSource(); + checkState( + sourceFile != null, + "Internal inconsistency: we tried to stage something to %s, but neither a source file " + + "nor the byte content was specified", + target); + if (sourceFile.isDirectory()) { + ZipFiles.zipDirectory(sourceFile, Channels.newOutputStream(writer)); + } else { + Files.asByteSource(sourceFile).copyTo(Channels.newOutputStream(writer)); + } + } } return StagingResult.uploaded(attributes); } @@ -272,6 +291,24 @@ class PackageUtil implements Closeable { classpathElements, stagingPath, DEFAULT_SLEEPER, DEFAULT_CREATE_OPTIONS); } + public DataflowPackage stageToFile( + byte[] bytes, String target, String stagingPath, CreateOptions createOptions) { + try { + return stagePackage( + PackageAttributes.forBytesToStage(bytes, target, stagingPath), + DEFAULT_SLEEPER, + createOptions) + .get() + .getPackageAttributes() + .getDestination(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while staging pipeline", e); + } catch (ExecutionException e) { + throw new RuntimeException("Error while staging pipeline", e.getCause()); + } + } + /** * Transfers the classpath elements to the staging location. * @@ -386,23 +423,6 @@ class PackageUtil implements Closeable { return fileName + "-" + contentHash + "." + fileExtension; } - /** - * Copies the contents of the classpathElement to the output channel. - * - * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly, - * otherwise the file contents are copied as-is. - * - * <p>The output channel is not closed. - */ - private static void copyContent(File classpathElement, WritableByteChannel outputChannel) - throws IOException { - if (classpathElement.isDirectory()) { - ZipFiles.zipDirectory(classpathElement, Channels.newOutputStream(outputChannel)); - } else { - Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel)); - } - } - @AutoValue abstract static class StagingResult { abstract PackageAttributes getPackageAttributes(); @@ -456,7 +476,26 @@ class PackageUtil implements Closeable { target.setName(uniqueName); target.setLocation(resourcePath); - return new AutoValue_PackageUtil_PackageAttributes(source, target, size, hash); + return new AutoValue_PackageUtil_PackageAttributes(source, null, target, size, hash); + } + + public static PackageAttributes forBytesToStage( + byte[] bytes, String targetName, String stagingPath) { + Hasher hasher = Hashing.md5().newHasher(); + String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.putBytes(bytes).hash().asBytes()); + long size = bytes.length; + + String uniqueName = getUniqueContentName(new File(targetName), hash); + + String resourcePath = + FileSystems.matchNewResource(stagingPath, true) + .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE) + .toString(); + DataflowPackage target = new DataflowPackage(); + target.setName(uniqueName); + target.setLocation(resourcePath); + + return new AutoValue_PackageUtil_PackageAttributes(null, bytes, target, size, hash); } public PackageAttributes withPackageName(String overridePackageName) { @@ -465,12 +504,17 @@ class PackageUtil implements Closeable { newDestination.setLocation(getDestination().getLocation()); return new AutoValue_PackageUtil_PackageAttributes( - getSource(), newDestination, getSize(), getHash()); + getSource(), getBytes(), newDestination, getSize(), getHash()); } - /** @return the file to be uploaded */ + /** @return the file to be uploaded, if any */ + @Nullable public abstract File getSource(); + /** @return the bytes to be uploaded, if any */ + @Nullable + public abstract byte[] getBytes(); + /** @return the dataflowPackage */ public abstract DataflowPackage getDestination(); @@ -479,5 +523,13 @@ class PackageUtil implements Closeable { /** @return the hash */ public abstract String getHash(); + + public String getSourceDescription() { + if (getSource() != null) { + return getSource().toString(); + } else { + return String.format("<%s bytes, hash %s>", getSize(), getHash()); + } + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java index f0be941..0b2013e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java @@ -43,4 +43,9 @@ public interface Stager { * contents of the staged file. */ List<DataflowPackage> stageFiles(List<String> filesToStage); + + /** + * Stage bytes to a target file name wherever this stager stages things. + */ + DataflowPackage stageToFile(byte[] bytes, String baseName); }