[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r198129013 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java --- @@ -114,17 +114,12 @@ public JarRunHandler( CompletableFuture jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); - final List keys; - try { - keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars()); + try (BlobClient blobClient = new BlobClient(address, configuration)) { + ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); --- End diff -- The `JarRunHandler` now also uses `uploadJobGraphFiles()`. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r198030793 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java --- @@ -114,17 +114,12 @@ public JarRunHandler( CompletableFuture jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); - final List keys; - try { - keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars()); + try (BlobClient blobClient = new BlobClient(address, configuration)) { + ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); --- End diff -- But it would reduce code redundancy, right? If this is the case, then let's do it. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197721680 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java --- @@ -114,17 +114,12 @@ public JarRunHandler( CompletableFuture jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); - final List keys; - try { - keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars()); + try (BlobClient blobClient = new BlobClient(address, configuration)) { + ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); --- End diff -- we _could_ use `uploadJobGraphFiles` here, but there isn't really a use-case for uploading distributed cache artifacts when going through the JarRunHandler, since we're already on the server here. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197496529 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); } + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry userArtifact : distributeViaDFS) { + public void finalizeUserArtifactEntries() { --- End diff -- https://issues.apache.org/jira/browse/FLINK-8713 ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197487498 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); } + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry userArtifact : distributeViaDFS) { + public void finalizeUserArtifactEntries() { --- End diff -- Alright, please create a follow up JIRA issue. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197472732 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); } + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry userArtifact : distributeViaDFS) { + public void finalizeUserArtifactEntries() { --- End diff -- I agree that this would nice, but I think that this is out of scope of this PR as we would have to touch an entirely new set of classes. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197467939 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.TestLogger; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * TODO: add javadoc. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static BlobServer blobServer; + + @BeforeClass + public static void setup() throws IOException { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + } + + @Test + public void uploadAndSetUserJars() throws IOException { + java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath(); + JobGraph jobGraph = new JobGraph(); + + Collection jars = Arrays.asList( + new Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()), + new Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString())); + + jars.forEach(jobGraph::addJar); + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(0, jobGraph.getUserJarBlobKeys().size()); + + try (BlobClient blobClient = new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())) { + ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); + } + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count()); --- End diff -- I will use `blobServer.getFile()` instead. to verify the validity of the blob keys ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197435093 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); } + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry userArtifact : distributeViaDFS) { + public void finalizeUserArtifactEntries() { --- End diff -- Maybe rename to `writeUserArtifactEntriesToConfiguration` ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197434489 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); --- End diff -- I would not throw a `FlinkRuntimeException` here. Instead we could led the `IOException` bubble up. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197433761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Contains utility methods for clients. + */ +public enum ClientUtils { + ; + + /** +* Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient}, +* and sets the appropriate blobkeys. +* +* @param jobGraph jobgraph requiring user jars +* @param blobClient client to upload jars with +* @throws IOException if the upload fails +*/ + public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { --- End diff -- Good point, let's do it then. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197433655 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) (BlobServerPortResponseBody response, String dispatcherAddress) -> { final int blobServerPort = response.port; final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List keys; - try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); - } - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); + List userJars = jobGraph.getUserJars(); + Map userArtifacts = jobGraph.getUserArtifacts(); + if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { + try (BlobClient client = new BlobClient(address, flinkConfig)) { --- End diff -- I would be in favour of having a `ClientUtils#uploadJobGraphFiles(jobGraph, flinkConfig, Supplier)` which basically does what's being done here. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197430670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Contains utility methods for clients. + */ +public enum ClientUtils { + ; + + /** +* Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient}, +* and sets the appropriate blobkeys. +* +* @param jobGraph jobgraph requiring user jars +* @param blobClient client to upload jars with +* @throws IOException if the upload fails +*/ + public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { + Collection blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient); + setUserJarBlobKeys(blobKeys, jobGraph); + } + + private static Collection uploadUserJars(JobID jobId, Collection userJars, BlobClient blobClient) throws IOException { + Collection blobKeys = new ArrayList<>(userJars.size()); + for (Path jar : userJars) { + final PermanentBlobKey blobKey = blobClient.uploadFile(jobId, jar); + blobKeys.add(blobKey); + } + return blobKeys; + } + + private static void setUserJarBlobKeys(Collection blobKeys, JobGraph jobGraph) { + blobKeys.forEach(jobGraph::addUserJarBlobKey); + } + + /** +* Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient}, +* and sets the appropriate blobkeys. +* +* @param jobGraph jobgraph requiring user artifacts +* @param blobClient client to upload artifacts with +* @throws IOException if the upload fails +*/ + public static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException { + Collection> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), jobGraph.getUserArtifacts(), blobClient); + setUserArtifactBlobKeys(jobGraph, blobKeys); + } + + private static Collection> uploadUserArtifacts(JobID jobID, Map userArtifacts, BlobClient blobClient) throws IOException { + Collection> blobKeys = new ArrayList<>(userArtifacts.size()); + for (Map.Entry userArtifact : userArtifacts.entrySet()) { + Path path = new Path(userArtifact.getValue().filePath); + // only upload local files + if (!path.getFileSystem().isDistributedFS()) { + final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, new Path(userArtifact.getValue().filePath)); --- End diff -- we could reuse `path` here ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197438267 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.TestLogger; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * TODO: add javadoc. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static BlobServer blobServer; + + @BeforeClass + public static void setup() throws IOException { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + } + + @Test + public void uploadAndSetUserJars() throws IOException { + java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath(); + JobGraph jobGraph = new JobGraph(); + + Collection jars = Arrays.asList( + new Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()), + new Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString())); + + jars.forEach(jobGraph::addJar); + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(0, jobGraph.getUserJarBlobKeys().size()); + + try (BlobClient blobClient = new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())) { + ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); + } + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count()); --- End diff -- Assert that we find the blob keys in the blob upload directory. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197437750 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); } + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry userArtifact : distributeViaDFS) { + public void finalizeUserArtifactEntries() { --- End diff -- I think we would not need this method if we don't write the `DistributedCacheEntries` into the configuration. If I'm not mistaken, then we send the `userArtifacts` map anyway to the cluster. The things which are missing are: Addind a serial version UID to the `DistributedCacheEntry`, and adding the `userArtifacts` to the `TaskDeploymentDescriptor` to send them to the `TaskManager`. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197438415 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.TestLogger; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * TODO: add javadoc. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static BlobServer blobServer; + + @BeforeClass + public static void setup() throws IOException { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + } + + @Test + public void uploadAndSetUserJars() throws IOException { + java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath(); + JobGraph jobGraph = new JobGraph(); + + Collection jars = Arrays.asList( + new Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()), + new Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString())); + + jars.forEach(jobGraph::addJar); + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(0, jobGraph.getUserJarBlobKeys().size()); + + try (BlobClient blobClient = new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())) { + ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); + } + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count()); + } + + @Test + public void uploadAndSetUserArtifacts() throws IOException { + java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath(); + JobGraph jobGraph = new JobGraph(); + + Collection localArtifacts = Arrays.asList( + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art1")).toString(), true, true), + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art2")).toString(), true, false), + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art3")).toString(), false, true), + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art4")).toString(), true, false) + ); + + Collection distributedArtifacts = Arrays.asList( + new DistributedCache.DistributedCacheEntry("hdfs://localhost:1234/test", true, false) + ); +
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197377877 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) (BlobServerPortResponseBody response, String dispatcherAddress) -> { final int blobServerPort = response.port; final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List keys; - try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); - } - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); + List userJars = jobGraph.getUserJars(); + Map userArtifacts = jobGraph.getUserArtifacts(); + if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { + try (BlobClient client = new BlobClient(address, flinkConfig)) { --- End diff -- alternatively we could refactor the try-with-resource statement and exception handling into a method that accepts a function, which would be used like this: ``` ClientUtils.withBlobClient(address, flinkConfig, () -> { log.info("Uploading jar files."); ClientUtils.uploadAndSetUserJars(jobGraph, client); log.info("Uploading jar artifacts."); ClientUtils.uploadAndSetUserArtifacts(jobGraph, client); } ``` ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197375181 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Contains utility methods for clients. + */ +public enum ClientUtils { + ; + + /** +* Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient}, +* and sets the appropriate blobkeys. +* +* @param jobGraph jobgraph requiring user jars +* @param blobClient client to upload jars with +* @throws IOException if the upload fails +*/ + public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { + Collection blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient); + setUserJarBlobKeys(blobKeys, jobGraph); + } + + private static Collection uploadUserJars(JobID jobId, Collection userJars, BlobClient blobClient) throws IOException { + Collection blobKeys = new ArrayList<>(userJars.size()); + for (Path jar : userJars) { + final PermanentBlobKey blobKey = blobClient.uploadFile(jobId, jar); + blobKeys.add(blobKey); + } + return blobKeys; + } + + private static void setUserJarBlobKeys(Collection blobKeys, JobGraph jobGraph) { + blobKeys.forEach(jobGraph::addUserJarBlobKey); + } + + /** +* Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient}, +* and sets the appropriate blobkeys. +* +* @param jobGraph jobgraph requiring user artifacts +* @param blobClient client to upload artifacts with +* @throws IOException if the upload fails +*/ + public static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException { + Collection> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), jobGraph.getUserArtifacts(), blobClient); + setUserArtifactBlobKeys(jobGraph, blobKeys); + } + + private static Collection> uploadUserArtifacts(JobID jobID, Map userArtifacts, BlobClient blobClient) throws IOException { --- End diff -- Signature could be changed to accept a Map instead. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197279905 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Contains utility methods for clients. + */ +public enum ClientUtils { + ; + + /** +* Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient}, +* and sets the appropriate blobkeys. +* +* @param jobGraph jobgraph requiring user jars +* @param blobClient client to upload jars with +* @throws IOException if the upload fails +*/ + public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { --- End diff -- `JarRunHandler` could use this method as well. ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197279550 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) (BlobServerPortResponseBody response, String dispatcherAddress) -> { final int blobServerPort = response.port; final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List keys; - try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); - } - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); + List userJars = jobGraph.getUserJars(); + Map userArtifacts = jobGraph.getUserArtifacts(); --- End diff -- this entire block is effectively duplicated in several classes and could also be moved to `ClientUtils`, but I wasn't sure whether this wouldn't put too much logic into a single method, ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197279258 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.TestLogger; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * TODO: add javadoc. --- End diff -- missing javadoc ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197279307 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** -* Uploads the previously added user JAR files to the job manager through -* the job manager's BLOB server. The BLOB servers' address is given as a -* parameter. This function issues a blocking call. -* -* @param blobServerAddress of the blob server to upload the jars to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the JobManager failed. -*/ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** -* Configures JobGraph with user specified artifacts. If the files are in local system it uploads them -* to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. -* -* @param blobServerAddress of the blob server to upload the files to -* @param blobClientConfig the blob client configuration -* @throws IOException Thrown, if the file upload to the Blob server failed. -*/ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set> uploadToBlobServer = new HashSet<>(); - Set> distributeViaDFS = new HashSet<>(); - - for (Map.Entry userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); } + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry userArtifact : distributeViaDFS) { + public void finalizeUserArtifactEntries() { --- End diff -- missing test ---
[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6199 [FLINK-9624][runtime] Move jar/artifact upload out of jobgraph ## What is the purpose of the change This PR moves the logic for uploading jars/artifacts from the jobgraph into a separate utility class usable by all submission methods. The new `ClientUtils` class exposes 2 methods for uploading jars/artifacts and setting the respective blob keys on the `JobGraph`. All existing job-submission method were updated to use the new utilities and should now behave the same. The subsumed methods in `JobGraph` were removed, but remnants of them remain in 2 added methods: * setUserArtifactBlobKey sets the blobkey for a specific entry * finalizeUserArtifactEntries writes the artifact entries into the `ExecutionConfig` ## Verifying this change * `ClientUtils` is tested in `ClientUtilsTest` * `JobGraph` changes are covered in `JobGraphTest` * client modifications are covered by various existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_delta Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6199.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6199 commit 13e3dc7dc9c0b7205223368a460993a309cb58ad Author: zentol Date: 2018-06-13T16:21:21Z [FLINK-9624][runtime] Move jar/artifact upload out of jobgraph ---