[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r198129013
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -114,17 +114,12 @@ public JarRunHandler(
 
CompletableFuture jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
-   final List keys;
-   try {
-   keys = BlobClient.uploadFiles(address, 
configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+   try (BlobClient blobClient = new BlobClient(address, 
configuration)) {
+   ClientUtils.uploadAndSetUserJars(jobGraph, 
blobClient);
--- End diff --

The `JarRunHandler` now also uses `uploadJobGraphFiles()`.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r198030793
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -114,17 +114,12 @@ public JarRunHandler(
 
CompletableFuture jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
-   final List keys;
-   try {
-   keys = BlobClient.uploadFiles(address, 
configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+   try (BlobClient blobClient = new BlobClient(address, 
configuration)) {
+   ClientUtils.uploadAndSetUserJars(jobGraph, 
blobClient);
--- End diff --

But it would reduce code redundancy, right? If this is the case, then let's 
do it.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197721680
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -114,17 +114,12 @@ public JarRunHandler(
 
CompletableFuture jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
-   final List keys;
-   try {
-   keys = BlobClient.uploadFiles(address, 
configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+   try (BlobClient blobClient = new BlobClient(address, 
configuration)) {
+   ClientUtils.uploadAndSetUserJars(jobGraph, 
blobClient);
--- End diff --

we _could_ use `uploadJobGraphFiles` here, but there isn't really a 
use-case for uploading distributed cache artifacts when going through the 
JarRunHandler, since we're already on the server here.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197496529
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

https://issues.apache.org/jira/browse/FLINK-8713


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197487498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

Alright, please create a follow up JIRA issue.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197472732
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

I agree that this would nice, but I think that this is out of scope of this 
PR as we would have to touch an entirely new set of classes.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197467939
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO: add javadoc.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static BlobServer blobServer;
+
+   @BeforeClass
+   public static void setup() throws IOException {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   blobServer = new BlobServer(config, new VoidBlobStore());
+   blobServer.start();
+   }
+
+   @Test
+   public void uploadAndSetUserJars() throws IOException {
+   java.nio.file.Path tmpDir = 
temporaryFolder.newFolder().toPath();
+   JobGraph jobGraph = new JobGraph();
+
+   Collection jars = Arrays.asList(
+   new 
Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()),
+   new 
Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString()));
+
+   jars.forEach(jobGraph::addJar);
+
+   assertEquals(jars.size(), jobGraph.getUserJars().size());
+   assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+
+   try (BlobClient blobClient = new BlobClient(new 
InetSocketAddress("localhost", blobServer.getPort()), new Configuration())) {
+   ClientUtils.uploadAndSetUserJars(jobGraph, blobClient);
+   }
+
+   assertEquals(jars.size(), jobGraph.getUserJars().size());
+   assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
+   assertEquals(jars.size(), 
jobGraph.getUserJarBlobKeys().stream().distinct().count());
--- End diff --

I will use `blobServer.getFile()` instead. to verify the validity of the 
blob keys


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197435093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

Maybe rename to `writeUserArtifactEntriesToConfiguration`


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197434489
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
--- End diff --

I would not throw a `FlinkRuntimeException` here. Instead we could led the 
`IOException` bubble up.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197433761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+   ;
+
+   /**
+* Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph   jobgraph requiring user jars
+* @param blobClient client to upload jars with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
--- End diff --

Good point, let's do it then.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197433655
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
 
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
+   List userJars = jobGraph.getUserJars();
+   Map userArtifacts = 
jobGraph.getUserArtifacts();
+   if (!userJars.isEmpty() || 
!userArtifacts.isEmpty()) {
+   try (BlobClient client = new 
BlobClient(address, flinkConfig)) {
--- End diff --

I would be in favour of having a `ClientUtils#uploadJobGraphFiles(jobGraph, 
flinkConfig, Supplier)` which basically does what's being done here.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197430670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+   ;
+
+   /**
+* Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph   jobgraph requiring user jars
+* @param blobClient client to upload jars with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
+   Collection blobKeys = 
uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+   setUserJarBlobKeys(blobKeys, jobGraph);
+   }
+
+   private static Collection uploadUserJars(JobID jobId, 
Collection userJars, BlobClient blobClient) throws IOException {
+   Collection blobKeys = new 
ArrayList<>(userJars.size());
+   for (Path jar : userJars) {
+   final PermanentBlobKey blobKey = 
blobClient.uploadFile(jobId, jar);
+   blobKeys.add(blobKey);
+   }
+   return blobKeys;
+   }
+
+   private static void setUserJarBlobKeys(Collection 
blobKeys, JobGraph jobGraph) {
+   blobKeys.forEach(jobGraph::addUserJarBlobKey);
+   }
+
+   /**
+* Uploads the user artifacts from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph jobgraph requiring user artifacts
+* @param blobClient client to upload artifacts with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserArtifacts(JobGraph jobGraph, 
BlobClient blobClient) throws IOException {
+   Collection> blobKeys = 
uploadUserArtifacts(jobGraph.getJobID(), jobGraph.getUserArtifacts(), 
blobClient);
+   setUserArtifactBlobKeys(jobGraph, blobKeys);
+   }
+
+   private static Collection> 
uploadUserArtifacts(JobID jobID, Map userArtifacts, BlobClient blobClient) 
throws IOException {
+   Collection> blobKeys = new 
ArrayList<>(userArtifacts.size());
+   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
+   Path path = new Path(userArtifact.getValue().filePath);
+   // only upload local files
+   if (!path.getFileSystem().isDistributedFS()) {
+   final PermanentBlobKey blobKey = 
blobClient.uploadFile(jobID, new Path(userArtifact.getValue().filePath));
--- End diff --

we could reuse `path` here


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197438267
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO: add javadoc.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static BlobServer blobServer;
+
+   @BeforeClass
+   public static void setup() throws IOException {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   blobServer = new BlobServer(config, new VoidBlobStore());
+   blobServer.start();
+   }
+
+   @Test
+   public void uploadAndSetUserJars() throws IOException {
+   java.nio.file.Path tmpDir = 
temporaryFolder.newFolder().toPath();
+   JobGraph jobGraph = new JobGraph();
+
+   Collection jars = Arrays.asList(
+   new 
Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()),
+   new 
Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString()));
+
+   jars.forEach(jobGraph::addJar);
+
+   assertEquals(jars.size(), jobGraph.getUserJars().size());
+   assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+
+   try (BlobClient blobClient = new BlobClient(new 
InetSocketAddress("localhost", blobServer.getPort()), new Configuration())) {
+   ClientUtils.uploadAndSetUserJars(jobGraph, blobClient);
+   }
+
+   assertEquals(jars.size(), jobGraph.getUserJars().size());
+   assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
+   assertEquals(jars.size(), 
jobGraph.getUserJarBlobKeys().stream().distinct().count());
--- End diff --

Assert that we find the blob keys in the blob upload directory.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197437750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

I think we would not need this method if we don't write the 
`DistributedCacheEntries` into the configuration. If I'm not mistaken, then we 
send the `userArtifacts` map anyway to the cluster. The things which are 
missing are: Addind a serial version UID to the `DistributedCacheEntry`, and 
adding the `userArtifacts` to the `TaskDeploymentDescriptor` to send them to 
the `TaskManager`.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197438415
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO: add javadoc.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static BlobServer blobServer;
+
+   @BeforeClass
+   public static void setup() throws IOException {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   blobServer = new BlobServer(config, new VoidBlobStore());
+   blobServer.start();
+   }
+
+   @Test
+   public void uploadAndSetUserJars() throws IOException {
+   java.nio.file.Path tmpDir = 
temporaryFolder.newFolder().toPath();
+   JobGraph jobGraph = new JobGraph();
+
+   Collection jars = Arrays.asList(
+   new 
Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()),
+   new 
Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString()));
+
+   jars.forEach(jobGraph::addJar);
+
+   assertEquals(jars.size(), jobGraph.getUserJars().size());
+   assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+
+   try (BlobClient blobClient = new BlobClient(new 
InetSocketAddress("localhost", blobServer.getPort()), new Configuration())) {
+   ClientUtils.uploadAndSetUserJars(jobGraph, blobClient);
+   }
+
+   assertEquals(jars.size(), jobGraph.getUserJars().size());
+   assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
+   assertEquals(jars.size(), 
jobGraph.getUserJarBlobKeys().stream().distinct().count());
+   }
+
+   @Test
+   public void uploadAndSetUserArtifacts() throws IOException {
+   java.nio.file.Path tmpDir = 
temporaryFolder.newFolder().toPath();
+   JobGraph jobGraph = new JobGraph();
+
+   Collection 
localArtifacts = Arrays.asList(
+   new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art1")).toString(),
 true, true),
+   new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art2")).toString(),
 true, false),
+   new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art3")).toString(),
 false, true),
+   new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art4")).toString(),
 true, false)
+   );
+
+   Collection 
distributedArtifacts = Arrays.asList(
+   new 
DistributedCache.DistributedCacheEntry("hdfs://localhost:1234/test", true, 
false)
+   );
+

[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197377877
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
 
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
+   List userJars = jobGraph.getUserJars();
+   Map userArtifacts = 
jobGraph.getUserArtifacts();
+   if (!userJars.isEmpty() || 
!userArtifacts.isEmpty()) {
+   try (BlobClient client = new 
BlobClient(address, flinkConfig)) {
--- End diff --

alternatively we could refactor the try-with-resource statement and 
exception handling into a method that accepts a function, which would be used 
like this:

```
ClientUtils.withBlobClient(address, flinkConfig, () -> {
log.info("Uploading jar files.");
ClientUtils.uploadAndSetUserJars(jobGraph, client);
log.info("Uploading jar artifacts.");
ClientUtils.uploadAndSetUserArtifacts(jobGraph, client);
}
```


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197375181
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+   ;
+
+   /**
+* Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph   jobgraph requiring user jars
+* @param blobClient client to upload jars with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
+   Collection blobKeys = 
uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+   setUserJarBlobKeys(blobKeys, jobGraph);
+   }
+
+   private static Collection uploadUserJars(JobID jobId, 
Collection userJars, BlobClient blobClient) throws IOException {
+   Collection blobKeys = new 
ArrayList<>(userJars.size());
+   for (Path jar : userJars) {
+   final PermanentBlobKey blobKey = 
blobClient.uploadFile(jobId, jar);
+   blobKeys.add(blobKey);
+   }
+   return blobKeys;
+   }
+
+   private static void setUserJarBlobKeys(Collection 
blobKeys, JobGraph jobGraph) {
+   blobKeys.forEach(jobGraph::addUserJarBlobKey);
+   }
+
+   /**
+* Uploads the user artifacts from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph jobgraph requiring user artifacts
+* @param blobClient client to upload artifacts with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserArtifacts(JobGraph jobGraph, 
BlobClient blobClient) throws IOException {
+   Collection> blobKeys = 
uploadUserArtifacts(jobGraph.getJobID(), jobGraph.getUserArtifacts(), 
blobClient);
+   setUserArtifactBlobKeys(jobGraph, blobKeys);
+   }
+
+   private static Collection> 
uploadUserArtifacts(JobID jobID, Map userArtifacts, BlobClient blobClient) 
throws IOException {
--- End diff --

Signature could be changed to accept a Map instead.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279905
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+   ;
+
+   /**
+* Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph   jobgraph requiring user jars
+* @param blobClient client to upload jars with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
--- End diff --

`JarRunHandler` could use this method as well.


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279550
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
 
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
+   List userJars = jobGraph.getUserJars();
+   Map userArtifacts = 
jobGraph.getUserArtifacts();
--- End diff --

this entire block is effectively duplicated in several classes and could 
also be moved to `ClientUtils`, but I wasn't sure whether this wouldn't put too 
much logic into a single method,


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279258
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO: add javadoc.
--- End diff --

missing javadoc


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279307
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

missing test


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6199

[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph

## What is the purpose of the change

This PR moves the logic for uploading jars/artifacts from the jobgraph into 
a separate utility class usable by all submission methods.

The new `ClientUtils` class exposes 2 methods for uploading jars/artifacts 
and setting the respective blob keys on the `JobGraph`.
All existing job-submission method were updated to use the new utilities 
and should now behave the same.

The subsumed methods in `JobGraph` were removed, but remnants of them 
remain in 2 added methods:
* setUserArtifactBlobKey sets the blobkey for a specific entry
* finalizeUserArtifactEntries writes the artifact entries into the 
`ExecutionConfig`


## Verifying this change

* `ClientUtils` is tested in `ClientUtilsTest`
* `JobGraph` changes are covered in `JobGraphTest`
* client modifications are covered by various existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_delta

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6199


commit 13e3dc7dc9c0b7205223368a460993a309cb58ad
Author: zentol 
Date:   2018-06-13T16:21:21Z

[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph




---