Stage the pipeline without using a temp file

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/090c5124
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/090c5124
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/090c5124

Branch: refs/heads/master
Commit: 090c512457e25c965efab2d6c849f1a50e03e052
Parents: aea0c60
Author: Kenneth Knowles <k...@apache.org>
Authored: Tue Oct 17 16:06:05 2017 -0700
Committer: Kenneth Knowles <k...@apache.org>
Committed: Wed Oct 18 13:02:25 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  22 +---
 .../beam/runners/dataflow/util/GcsStager.java   |  29 +++--
 .../beam/runners/dataflow/util/PackageUtil.java | 116 ++++++++++++++-----
 .../beam/runners/dataflow/util/Stager.java      |   5 +
 4 files changed, 111 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6dbc4af..ecef072 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -41,7 +41,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URISyntaxException;
@@ -191,10 +190,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024;
 
   @VisibleForTesting
-  static final String PIPELINE_FILE_NAME = "pipeline";
-
-  @VisibleForTesting
-  static final String SERIALIZED_PROTOBUF_EXTENSION = ".pb";
+  static final String PIPELINE_FILE_NAME = "pipeline.pb";
 
   private static final String STAGED_PIPELINE_METADATA_PROPERTY = 
"pipeline_url";
 
@@ -526,22 +522,10 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
 
-    RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline);
-    File serializedProtoPipeline;
-    try {
-      serializedProtoPipeline =
-          File.createTempFile(PIPELINE_FILE_NAME, 
SERIALIZED_PROTOBUF_EXTENSION);
-      protoPipeline.writeDelimitedTo(new 
FileOutputStream(serializedProtoPipeline));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
+    byte[] serializedProtoPipeline = 
PipelineTranslation.toProto(pipeline).toByteArray();
     LOG.info("Staging pipeline description to {}", 
options.getStagingLocation());
     DataflowPackage stagedPipeline =
-        options
-            .getStager()
-            
.stageFiles(ImmutableList.of(serializedProtoPipeline.getAbsolutePath()))
-            .get(0);
+        options.getStager().stageToFile(serializedProtoPipeline, 
PIPELINE_FILE_NAME);
 
     // Set a unique client_request_id in the CreateJob request.
     // This is used to ensure idempotence of job creation across retried

http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index ff205f0..7ed78e8 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -72,19 +72,28 @@ public class GcsStager implements Stager {
    */
   @Override
   public List<DataflowPackage> stageFiles(List<String> filesToStage) {
+    try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
+      return packageUtil.stageClasspathElements(
+          filesToStage, options.getStagingLocation(), buildCreateOptions());
+    }
+  }
+
+  @Override
+  public DataflowPackage stageToFile(byte[] bytes, String baseName) {
+    try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
+      return packageUtil.stageToFile(
+          bytes, baseName, options.getStagingLocation(), buildCreateOptions());
+    }
+  }
+
+  private GcsCreateOptions buildCreateOptions() {
     int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 
1024 * 1024);
     checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
     uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
 
-    GcsCreateOptions createOptions =
-        GcsCreateOptions.builder()
-            .setGcsUploadBufferSizeBytes(uploadSizeBytes)
-            .setMimeType(MimeTypes.BINARY)
-            .build();
-
-    try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) {
-      return packageUtil.stageClasspathElements(
-          filesToStage, options.getStagingLocation(), createOptions);
-    }
+    return GcsCreateOptions.builder()
+        .setGcsUploadBufferSizeBytes(uploadSizeBytes)
+        .setMimeType(MimeTypes.BINARY)
+        .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 449b36d..565e965 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.fasterxml.jackson.core.Base64Variants;
 import com.google.api.client.util.BackOff;
@@ -29,6 +30,7 @@ import com.google.common.base.Function;
 import com.google.common.hash.Funnels;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+import com.google.common.io.ByteSource;
 import com.google.common.io.CountingOutputStream;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AsyncFunction;
@@ -51,6 +53,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
 import org.apache.beam.sdk.io.FileSystems;
@@ -182,11 +185,11 @@ class PackageUtil implements Closeable {
   private StagingResult stagePackageSynchronously(
       PackageAttributes attributes, Sleeper retrySleeper, CreateOptions 
createOptions)
       throws IOException, InterruptedException {
-    File source = attributes.getSource();
+    String sourceDescription = attributes.getSourceDescription();
     String target = attributes.getDestination().getLocation();
 
     if (alreadyStaged(attributes)) {
-      LOG.debug("Skipping file already staged: {} at {}", source, target);
+      LOG.debug("Skipping file already staged: {} at {}", sourceDescription, 
target);
       return StagingResult.cached(attributes);
     }
 
@@ -194,14 +197,14 @@ class PackageUtil implements Closeable {
       return tryStagePackageWithRetry(attributes, retrySleeper, createOptions);
     } catch (Exception miscException) {
       throw new RuntimeException(
-          String.format("Could not stage %s to %s", source, target), 
miscException);
+          String.format("Could not stage %s to %s", sourceDescription, 
target), miscException);
     }
   }
 
   private StagingResult tryStagePackageWithRetry(
       PackageAttributes attributes, Sleeper retrySleeper, CreateOptions 
createOptions)
       throws IOException, InterruptedException {
-    File source = attributes.getSource();
+    String sourceDescription = attributes.getSourceDescription();
     String target = attributes.getDestination().getLocation();
     BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
 
@@ -217,19 +220,22 @@ class PackageUtil implements Closeable {
                       + "of %s. Please verify credentials are valid and that 
you have "
                       + "write access to %s. Stale credentials can be resolved 
by executing "
                       + "'gcloud auth application-default login'.",
-                  source, target);
+                  sourceDescription, target);
           LOG.error(errorMessage);
           throw new IOException(errorMessage, ioException);
         }
 
         long sleep = backoff.nextBackOffMillis();
         if (sleep == BackOff.STOP) {
-          LOG.error("Upload failed, will NOT retry staging of package: {}", 
source, ioException);
+          LOG.error(
+              "Upload failed, will NOT retry staging of package: {}",
+              sourceDescription,
+              ioException);
           throw new RuntimeException("Could not stage %s to %s", ioException);
         } else {
           LOG.warn(
               "Upload attempt failed, sleeping before retrying staging of 
package: {}",
-              source,
+              sourceDescription,
               ioException);
           retrySleeper.sleep(sleep);
         }
@@ -237,16 +243,29 @@ class PackageUtil implements Closeable {
     }
   }
 
-  private StagingResult tryStagePackage(
-      PackageAttributes attributes, CreateOptions createOptions)
+  private StagingResult tryStagePackage(PackageAttributes attributes, 
CreateOptions createOptions)
       throws IOException, InterruptedException {
-    File source = attributes.getSource();
+    String sourceDescription = attributes.getSourceDescription();
     String target = attributes.getDestination().getLocation();
 
-    LOG.info("Uploading {} to {}", source, target);
+    LOG.info("Uploading {} to {}", sourceDescription, target);
     try (WritableByteChannel writer =
         FileSystems.create(FileSystems.matchNewResource(target, false), 
createOptions)) {
-      copyContent(attributes.getSource(), writer);
+      if (attributes.getBytes() != null) {
+        
ByteSource.wrap(attributes.getBytes()).copyTo(Channels.newOutputStream(writer));
+      } else {
+        File sourceFile = attributes.getSource();
+        checkState(
+            sourceFile != null,
+            "Internal inconsistency: we tried to stage something to %s, but 
neither a source file "
+                + "nor the byte content was specified",
+            target);
+        if (sourceFile.isDirectory()) {
+          ZipFiles.zipDirectory(sourceFile, Channels.newOutputStream(writer));
+        } else {
+          
Files.asByteSource(sourceFile).copyTo(Channels.newOutputStream(writer));
+        }
+      }
     }
     return StagingResult.uploaded(attributes);
   }
@@ -272,6 +291,24 @@ class PackageUtil implements Closeable {
         classpathElements, stagingPath, DEFAULT_SLEEPER, 
DEFAULT_CREATE_OPTIONS);
   }
 
+  public DataflowPackage stageToFile(
+      byte[] bytes, String target, String stagingPath, CreateOptions 
createOptions) {
+    try {
+      return stagePackage(
+              PackageAttributes.forBytesToStage(bytes, target, stagingPath),
+              DEFAULT_SLEEPER,
+              createOptions)
+          .get()
+          .getPackageAttributes()
+          .getDestination();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while staging pipeline", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Error while staging pipeline", e.getCause());
+    }
+  }
+
   /**
    * Transfers the classpath elements to the staging location.
    *
@@ -386,23 +423,6 @@ class PackageUtil implements Closeable {
     return fileName + "-" + contentHash + "." + fileExtension;
   }
 
-  /**
-   * Copies the contents of the classpathElement to the output channel.
-   *
-   * <p>If the classpathElement is a directory, a Zip stream is constructed on 
the fly,
-   * otherwise the file contents are copied as-is.
-   *
-   * <p>The output channel is not closed.
-   */
-  private static void copyContent(File classpathElement, WritableByteChannel 
outputChannel)
-      throws IOException {
-    if (classpathElement.isDirectory()) {
-      ZipFiles.zipDirectory(classpathElement, 
Channels.newOutputStream(outputChannel));
-    } else {
-      
Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel));
-    }
-  }
-
   @AutoValue
   abstract static class StagingResult {
     abstract PackageAttributes getPackageAttributes();
@@ -456,7 +476,26 @@ class PackageUtil implements Closeable {
       target.setName(uniqueName);
       target.setLocation(resourcePath);
 
-      return new AutoValue_PackageUtil_PackageAttributes(source, target, size, 
hash);
+      return new AutoValue_PackageUtil_PackageAttributes(source, null, target, 
size, hash);
+    }
+
+    public static PackageAttributes forBytesToStage(
+        byte[] bytes, String targetName, String stagingPath) {
+      Hasher hasher = Hashing.md5().newHasher();
+      String hash = 
Base64Variants.MODIFIED_FOR_URL.encode(hasher.putBytes(bytes).hash().asBytes());
+      long size = bytes.length;
+
+      String uniqueName = getUniqueContentName(new File(targetName), hash);
+
+      String resourcePath =
+          FileSystems.matchNewResource(stagingPath, true)
+              .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE)
+              .toString();
+      DataflowPackage target = new DataflowPackage();
+      target.setName(uniqueName);
+      target.setLocation(resourcePath);
+
+      return new AutoValue_PackageUtil_PackageAttributes(null, bytes, target, 
size, hash);
     }
 
     public PackageAttributes withPackageName(String overridePackageName) {
@@ -465,12 +504,17 @@ class PackageUtil implements Closeable {
       newDestination.setLocation(getDestination().getLocation());
 
       return new AutoValue_PackageUtil_PackageAttributes(
-          getSource(), newDestination, getSize(), getHash());
+          getSource(), getBytes(), newDestination, getSize(), getHash());
     }
 
-    /** @return the file to be uploaded */
+    /** @return the file to be uploaded, if any */
+    @Nullable
     public abstract File getSource();
 
+    /** @return the bytes to be uploaded, if any */
+    @Nullable
+    public abstract byte[] getBytes();
+
     /** @return the dataflowPackage */
     public abstract DataflowPackage getDestination();
 
@@ -479,5 +523,13 @@ class PackageUtil implements Closeable {
 
     /** @return the hash */
     public abstract String getHash();
+
+    public String getSourceDescription() {
+      if (getSource() != null) {
+        return getSource().toString();
+      } else {
+        return String.format("<%s bytes, hash %s>", getSize(), getHash());
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/090c5124/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
index f0be941..0b2013e 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/Stager.java
@@ -43,4 +43,9 @@ public interface Stager {
    * contents of the staged file.
    */
   List<DataflowPackage> stageFiles(List<String> filesToStage);
+
+  /**
+   * Stage bytes to a target file name wherever this stager stages things.
+   */
+  DataflowPackage stageToFile(byte[] bytes, String baseName);
 }

Reply via email to