[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531822#comment-16531822
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6203


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531618#comment-16531618
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199867852
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+   objectOut.writeObject(jobGraph);
+   }
+   return jobGraphFile;
+   } catch (IOException e) {
+   throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+   }
+   }, executorService);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   CompletableFuture>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
-   }
+   filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-   return jobGraph;
-   });
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-   }
-   });
+   final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+   jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames);
+
+   return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+   });
+
+   final CompletableFuture submissionFuture 
= requestFuture.thenCompose(
+   requestAndFileUploads -> sendRetriableRequest(
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531612#comment-16531612
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199866326
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+   objectOut.writeObject(jobGraph);
+   }
+   return jobGraphFile;
+   } catch (IOException e) {
+   throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+   }
+   }, executorService);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   CompletableFuture>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
-   }
+   filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-   return jobGraph;
-   });
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-   }
-   });
+   final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+   jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames);
+
+   return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+   });
+
+   final CompletableFuture submissionFuture 
= requestFuture.thenCompose(
+   requestAndFileUploads -> sendRetriableRequest(
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531319#comment-16531319
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199794599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531320#comment-16531320
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199793741
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+   objectOut.writeObject(jobGraph);
+   }
+   return jobGraphFile;
+   } catch (IOException e) {
+   throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+   }
+   }, executorService);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   CompletableFuture>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
-   }
+   filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-   return jobGraph;
-   });
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-   }
-   });
+   final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+   jobGraphFile.getFileName().toString(),
+   jarFileNames,
+   artifactFileNames);
+
+   return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+   });
+
+   final CompletableFuture submissionFuture 
= requestFuture.thenCompose(
+   requestAndFileUploads -> sendRetriableRequest(
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531258#comment-16531258
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -105,7 +106,8 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
messageHeaders.getResponseStatusCode(),
responseHeaders);
}
-   });
+   }).whenComplete((P resp, Throwable throwable) -> 
processingFinishedFuture.complete(null));
--- End diff --

I think we are swallowing potential exceptions here. I think it would be 
better to do something like
```
return response.whenComplete(...).thenApply(ignored -> null)
```

That way we would also get rid of the `processingFinishedFuture`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531256#comment-16531256
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199780490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

This is a good point.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531094#comment-16531094
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199741252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -18,64 +18,118 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * We currently require the job-jars to be uploaded through the 
blob-server.
+ * This request only contains the names of files that must be present 
on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-   /**
-* The serialized job graph.
-*/
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-   public final byte[] serializedJobGraph;
+   @JsonProperty(FIELD_NAME_JOB_GRAPH)
+   public final String jobGraphFileName;
 
-   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-   this(serializeJobGraph(jobGraph));
-   }
+   @JsonProperty(FIELD_NAME_JOB_JARS)
+   public final Collection jarFileNames;
+
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+   public final Collection artifactFileNames;
 
-   @JsonCreator
--- End diff --

revert


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529700#comment-16529700
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199456197
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -119,15 +113,17 @@ public void testSuccessfulJobSubmission() throws 
Exception {
}
}
 
-   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-   when(mockGateway.getHostname()).thenReturn("localhost");
-   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
-   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
-   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+   TestingDispatcherGateway.Builder builder = new 
TestingDispatcherGateway.Builder();
--- End diff --

This block is written the way it is since methods that the 
`TestingDispatcherGateway.Builder` inherits return a 
`TestingRestfulGateway.Builder`, which also applies to `build`.
it's a bit cumbersome, but I couldn't find a solution that doesn't include 
copying the entire `TestingRestfulGateway.Builder`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529673#comment-16529673
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6203
  
@tillrohrmann I've updated the PR.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529666#comment-16529666
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199450747
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
--- End diff --

Will do once the PR is merged.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529608#comment-16529608
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199439130
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
}
 
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
-   });
+
+   final CompletableFuture 
submitFuture = sendRetriableRequest(
--- End diff --

This is a slightly more extensive change since the cleanup needs access to 
the `jobGraphFile`. I'll see what i can do.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529561#comment-16529561
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199427903
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

seems odd to modify the signature to make the code prettier.
There's no use-case for the handlers to return anything but null, so why 
even allow it? Doesn't this go against the principle of using the most 
restrictive interface?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528913#comment-16528913
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199333574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528024#comment-16528024
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199236398
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527808#comment-16527808
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199178551
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
--- End diff --

I think we could combine these to `try` statement to `new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527825#comment-16527825
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199196388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -18,64 +18,118 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * We currently require the job-jars to be uploaded through the 
blob-server.
+ * This request only contains the names of files that must be present 
on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+   private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+   private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+   private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-   /**
-* The serialized job graph.
-*/
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-   public final byte[] serializedJobGraph;
+   @JsonProperty(FIELD_NAME_JOB_GRAPH)
+   public final String jobGraphFileName;
 
-   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-   this(serializeJobGraph(jobGraph));
-   }
+   @JsonProperty(FIELD_NAME_JOB_JARS)
+   public final Collection jarFileNames;
+
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+   public final Collection artifactFileNames;
 
-   @JsonCreator
public JobSubmitRequestBody(
-   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
-   this.serializedJobGraph = 
Preconditions.checkNotNull(serializedJobGraph);
+   @JsonProperty(FIELD_NAME_JOB_GRAPH) String 
jobGraphFileName,
+   @JsonProperty(FIELD_NAME_JOB_JARS) Collection 
jarFileNames,
+   @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) 
Collection artifactFileNames) {
+   this.jobGraphFileName = jobGraphFileName;
+   this.jarFileNames = jarFileNames;
+   this.artifactFileNames = artifactFileNames;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+   return Objects.equals(jobGraphFileName, that.jobGraphFileName) 
&&
+   Objects.equals(jarFileNames, that.jarFileNames) &&
+   Objects.equals(artifactFileNames, 
that.artifactFileNames);
}
 
@Override
public int hashCode() {
-   return 71 * Arrays.hashCode(this.serializedJobGraph);
+   return Objects.hash(jobGraphFileName, jarFileNames, 
artifactFileNames);
}
 
@Override
-   public boolean equals(Object object) {
-   if (object instanceof JobSubmitRequestBody) {
-   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
-   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
-   }
-   return false;
+   public String toString() {
+   return "JobSubmitRequestBody{" +
+   "jobGraphFileName='" + jobGraphFileName + '\'' +
+   ", jarFileNames=" + jarFileNames +
+   ", artifactFileNames=" + artifactFileNames +
+   '}';
}
 
-   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
-   try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 
* 1024)) {
-   ObjectOutputStream out = new ObjectOutputStream(baos);
+   /**
+* Descriptor for a 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527821#comment-16527821
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199195443
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
--- End diff --

Moreover we would no longer have to prefix the Flink `Path` type because we 
no longer have an ambiguity.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527814#comment-16527814
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199187385
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
--- End diff --

Alternatively we could also call `File::getName`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527811#comment-16527811
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199182588
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
}
 
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
-   });
+
+   final CompletableFuture 
submitFuture = sendRetriableRequest(
--- End diff --

Shall we split up the resource preparation and sending the actual request 
into different steps? Then this `Supplier` would be much simpler:
```
CompletableFuture> 
requestFuture = CompletableFuture.supplyAsync(() -> ..., executorService);

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527830#comment-16527830
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199198501
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
--- End diff --

Maybe replace by `TestingDispatcherGateway`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527819#comment-16527819
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199190292
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527809#comment-16527809
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199179398
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
--- End diff --

I think the failure message `"Failed to serialize JobGraph` should go to a 
dedicated exception because completion exceptions can be filtered out. `throw 
new CompletionException(new FlinkException("Failed to serialize JobGraph.", e))`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527823#comment-16527823
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199195959
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ---
@@ -152,6 +152,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
},
ctx.executor());
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
--- End diff --

Let's remove this future and simply return the `whenComplete` return value.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527817#comment-16527817
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199188036
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
--- End diff --

In `getPathAndAssertUpload` the parameters are in the other order. Would be 
cool to make it consistent.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
> 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527807#comment-16527807
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199177926
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
--- End diff --

Let's create a follow up story for it and remove the TODO


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527820#comment-16527820
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199191206
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527813#comment-16527813
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199187175
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
--- End diff --

Let's add a type `(File file) -> ...`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527831#comment-16527831
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199199366
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
 
-   JobGraph job = new JobGraph("testjob");
-   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
 
-   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
.get();
}
+
+   @Test
+   public void testRejectionOnCountMismatch() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+   final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
+
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT,
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
+
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), 
countExceedingFile.toFile())), mockGateway)
+   .get();
+   } catch (Exception e) {
+   ExceptionUtils.findThrowable(e, candidate -> candidate 
instanceof RestHandlerException && candidate.getMessage().contains("count"));
+   }
+   }
+
+   @Test
+   public void testFileHandling() throws Exception {
+   final String dcEntryName = "entry";
+
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527815#comment-16527815
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199187606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
--- End diff --

maybe make `final`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527828#comment-16527828
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199198055
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
 
-   JobGraph job = new JobGraph("testjob");
-   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
 
-   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
.get();
}
+
+   @Test
+   public void testRejectionOnCountMismatch() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+   final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
+
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
--- End diff --

This is duplicate code, can we avoid it?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527826#comment-16527826
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199192714
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
--- End diff --

Let's call it `FILE_TYPE_JOB_GRAPH`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527827#comment-16527827
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199197307
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -47,8 +65,30 @@
  */
 public class JobSubmitHandlerTest extends TestLogger {
 
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+   private static BlobServer blobServer;
+
+   @BeforeClass
+   public static void setup() throws IOException {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+   blobServer = new BlobServer(config, new VoidBlobStore());
+   blobServer.start();
+   }
+
+   @AfterClass
+   public static void teardown() throws IOException {
+   if (blobServer != null) {
+   blobServer.close();
+   }
+   }
+
@Test
public void testSerializationFailureHandling() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
--- End diff --

Here we could use instead `() -> new CompletableFuture()`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527812#comment-16527812
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199186445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -79,6 +79,7 @@ protected void respondToRequest(ChannelHandlerContext 
ctx, HttpRequest httpReque
response = FutureUtils.completedExceptionally(e);
}
 
+   CompletableFuture processingFinishedFuture = new 
CompletableFuture<>();
response.whenComplete((P resp, Throwable throwable) -> {
--- End diff --

Can't we simply return the result of the `whenComplete` stage here? We 
would just need to change the return type of `respondToRequest` to 
`CompletableFuture`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527829#comment-16527829
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199198445
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.getHostname()).thenReturn("localhost");
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   TestingUtils.defaultExecutor());
 
-   JobGraph job = new JobGraph("testjob");
-   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(jobGraphFile.getFileName().toString(), 
Collections.emptyList(), Collections.emptyList());
 
-   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), 
mockGateway)
.get();
}
+
+   @Test
+   public void testRejectionOnCountMismatch() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
+   objectOut.writeObject(new JobGraph("testjob"));
+   }
+   }
+   final Path countExceedingFile = 
TEMPORARY_FOLDER.newFile().toPath();
+
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
--- End diff --

Maybe we could replace it by the `TestingDispatcherGateway`


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527816#comment-16527816
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199188278
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
--- End diff --

Let's add 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527822#comment-16527822
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199197524
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
+   final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream objectOut = new 
ObjectOutputStream(fileOut)) {
--- End diff --

These tries could be collapsed.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527818#comment-16527818
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199189017
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
+   ));
+
+   if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+   nameToFile.size(),
+   uploadedFiles.size()),
+   HttpResponseStatus.BAD_REQUEST
+   );
+   }
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
+
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   }, executor);
+
+   Collection jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
+
+   Collection> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527810#comment-16527810
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199181211
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-   } catch (Exception e) {
-   throw new CompletionException(e);
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new CompletionException("Failed 
to serialize JobGraph.", e);
}
 
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
+   for (Path jar : jobGraph.getUserJars()) {
+   jarFileNames.add(jar.getName());
+   filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+   }
 
-   try {
-   return sendRequest(
-   JobSubmitHeaders.getInstance(),
-   new 
JobSubmitRequestBody(jobGraph));
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+   for (Map.Entry artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+   artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+   filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
-   });
+
+   final CompletableFuture 
submitFuture = sendRetriableRequest(
+   JobSubmitHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   new JobSubmitRequestBody(
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527824#comment-16527824
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199195175
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -34,38 +40,137 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private static final String FILE_TYPE_GRAPH = "JobGraph";
+   private static final String FILE_TYPE_JAR = "Jar";
+   private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+   private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.executor = executor;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.toPath().getFileName().toString(),
+   File::toPath
--- End diff --

I think we can completely remove the `nio.Path` dependency if we use 
`Path::fromLocalFile` with `Path` being Flink's path implementation. The 
benefit would be that we would not have to change types in the 
`getJarFilesToUpload` and `getArtifactFilesToUpload` methods. Instead of 
`Files.newInputStream(jobGraphFile)` we would have to call 
`FileSystem.getLocalFileSystem().open(jobGraphFile)`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527476#comment-16527476
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6203
  
@tillrohrmann I've rebased the PR and addressed the remaining issues. I've 
switched to using `File` collections in the last 3 commits.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527326#comment-16527326
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199094230
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

What about `ClientUtils`, should they also accept `File` collections?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527289#comment-16527289
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r199077598
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

So how far do you propose to go? Should `HandlerRequest` also receive a 
`Collection`, or convert the existing `Collection`? Should 
`FileUploads#getUploadedFiles` return a `Collection`?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523902#comment-16523902
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6203
  
It appears that the upload or large (64mb is what i tried) mixed multipart 
message currently fails, which caused the scala-shell failures. Currently 
investigating, but i can't reproduce it locally unfortunately...


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523838#comment-16523838
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r198178787
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,42 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
+
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new RuntimeException("lol", e);
--- End diff --

needs a proper exception


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523319#comment-16523319
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r198033040
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

I agree that nio `Paths` are more powerful. I'm just wondering whether we 
actually need this flexibility here. Usually it is a good idea to make the 
interface as restrictive as possible and widen it on demand.

Moreover, there is also Flink's `Path` which comes with a bit of different 
semantics. For example, you have the safety net for closing open file streams 
which is also used to interrupt I/O operations which are otherwise not 
interruptible. Mixing the nio paths in there, might give the false impression 
that this also applies to them.

So I'm just saying that we should make this decision consciously and not 
only based on which type is more convenient to be used.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523309#comment-16523309
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6203
  
Good point. I see two solutions here: Either we let the `AbstractHandler` 
implementation which supports file upload be responsible for deleting the files 
once it has processed the request or we register a `whenComplete` handler on 
the response future and execute the clean up in this handler once the response 
is complete.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522086#comment-16522086
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197735309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
--- End diff --

I was wondering when you might bring that up :)

I'm not a fan of exposing `File`.
* whether it _really is a local file_ shouldn't be relevant to the handler
* nio Paths are more flexible than files; for example if the 
DistributedCache were to return files we wouldn't have to extract zips as we 
could mount it with a `ZipFileSystem` instead
* always having to convert using `File#toPath` is rather tedious


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521025#comment-16521025
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6203
  
There's a problem with how we designed the `FileUploads` cleanup. The 
`AbstractHandler` cleans it up when `AbstractHandler#respondToRequest` returns. 
There is however no guarantee that the request has been fully processed at this 
point, since `AbstractRestHandler` frequently processes requests asynchronously 
using `CompletableFutures`. In turn we may end up deleting files _before_ they 
have been processed or _while _ they are being processed.
It appears we can only handle the cleanup in `AbstractHandler` if we let 
`AbstractHandler#respondToRequest` return a `CompletableFuture` that marks the 
finished request processing.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520539#comment-16520539
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197493894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+ 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520535#comment-16520535
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197492045
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+ 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520513#comment-16520513
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197487718
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520484#comment-16520484
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197477484
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+ 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520412#comment-16520412
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197461746
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520395#comment-16520395
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r197456037
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-   JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-   jobGraph = (JobGraph) objectIn.readObject();
-   } catch (Exception e) {
-   throw new RestHandlerException(
-   "Failed to deserialize JobGraph.",
-   HttpResponseStatus.BAD_REQUEST,
-   e);
+   Collection uploadedFiles = request.getUploadedFiles();
+   Map nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+   path -> path.getFileName().toString(),
+   entry -> entry
+   ));
+
+   JobSubmitRequestBody requestBody = request.getRequestBody();
+
+   Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+   Collection jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
+   for (String jarFileName : requestBody.jarFileNames) {
+   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
+   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
+   }
+
+   Collection> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+   for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
+   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString(;
}
 
-   return gateway.submitJob(jobGraph, timeout)
-   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+   Map 
temporaryHack = artifacts.stream()
+   .collect(Collectors.toMap(
+   tuple -> tuple.f0,
+   // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
+   // blame whoever wrote the ClientUtils API
+   tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+   ));
+
+   // TODO: use executor
+   CompletableFuture jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
+   JobGraph jobGraph;
+   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   throw new CompletionException(new 
RestHandlerException(
+   "Failed to deserialize JobGraph.",
+   HttpResponseStatus.BAD_REQUEST,
+   e));
+   }
+   return jobGraph;
+   });
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
+   Collection jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+   ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
+
+   Collection> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
+   ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
+ 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516986#comment-16516986
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6147


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516985#comment-16516985
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6147
  
I will split this PR to address the various issues separately.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511248#comment-16511248
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r19537
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
+   } catch (IOException ioe) {
+   log.error("Could not upload job jar files.", 
ioe);
+   throw new CompletionException(new 
RestHandlerException("Could not upload job jar files.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   }
+
+   for (PermanentBlobKey key : keys) {
+   jobGraph.addUserJarBlobKey(key);
+   }
+
+   return jobGraph;
+   });
+
+   CompletableFuture submissionFuture = 
jobGraphFuture
+   .thenCompose(finalizedJobGraph -> 
gateway.submitJob(jobGraph, timeout))
.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+
+   CompletableFuture submissionCleanupFuture = 
submissionFuture.thenRun(requestBody::cleanup);
+
+   return submissionFuture.thenCombine(submissionCleanupFuture, 
(responseBody, ignored) -> responseBody);
+   }
+
+   /**
+* Updates the jar entries in the given JobGraph to refer to the 
uploaded jar files instead of client-local files.
+*/
+   private static void updateJarEntriesInJobGraph(JobGraph jobGraph, 
Collection uploadedJars, Logger log) {
--- End diff --

correct, this field is used in `JobGraph#uploadUserJars` to upload the jars 
to the blob-server. Since this is now done on the server but the original 
entries still point to client-local files we have to update the entries.

If we move the upload out of the jobgraph we can skip this step.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511244#comment-16511244
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195110485
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
--- End diff --

I didn't mean to use Netty's `FileUpload` class but creating our own where 
we could specify the content type and other information. But we can begin also 
with `Collection` and encode what kind of file it is in the file name, 
for example.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511204#comment-16511204
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195101445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
--- End diff --

So far we managed to not expose netty stuff in the `RestClient` API, I 
would prefer if we didn't start now.

Could we not just pass a single `Collection` instead? The 
content-type would then always be `application/octet-stream`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511196#comment-16511196
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195098933
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(request);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+
+   LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
+   // serialize payload
+   StringWriter sw = new StringWriter();
+   objectMapper.writeValue(sw, request);
+   ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+   // do not load file into memory, this can have weird 
side-effects and break functionality
+   HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+   HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
+   httpRequest.headers()
+   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
+   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+   // takes care of splitting the request into multiple parts
+   HttpPostRequestEncoder bodyRequestEncoder;
+   try {
+   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
+
+   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+   requestAttribute.setContent(payload);
+   bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
+   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
+
+   bodyRequestEncoder.finalizeRequest();
--- End diff --

I think we can _always_ send the request that the encoder returns.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511236#comment-16511236
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195108792
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -581,6 +582,8 @@ public String toString() {
}
 
public void uploadUserArtifacts(InetSocketAddress blobServerAddress, 
Configuration clientConfig) throws IOException {
--- End diff --

Yes


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511229#comment-16511229
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195102896
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -119,6 +121,10 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
}
//noinspection unchecked
request = (R) new FileUpload(path);
+   } else if (untypedResponseMessageHeaders == 
JobSubmitHeaders.getInstance()) {
+   final JobSubmitRequestBody jobSubmission = 
ctx.channel().attr(FileUploadHandler.SUBMITTED_JOB).get();
+   //noinspection unchecked
+   request = (R) jobSubmission;
--- End diff --

Not sure whether I would make the job submission a special case here. What 
if other requests will allow in the future to upload files as well. 
Alternatively, we could make the attribute map or the set of uploaded files 
accessible to the `AbstractRestHandler` implementations. Then every handler 
could implement the support for uploaded files themselves. What do you think?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511234#comment-16511234
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195101162
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -91,19 +111,49 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
 
while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
-   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
-   final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
-   checkState(fileUpload.isCompleted());
-
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
-   fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   if 
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
 {
--- End diff --

I think the `FileUploadHandler` should not know about the 
`JobSubmitHandler`. Instead it should only be responsible for receiving 
uploaded files, storing them in a temp directory and then making them 
accessible to a downstream handler (e.g. through an `Attribute` in the 
`AttributeMap`). In order to defer the deserialization of the Json part of the 
payload, we could create a new `HttpRequest` which contains exactly the data 
sent as a `MemoryAttribute` (the branch which matches 
`InterfaceHttpData.HttpDataType.Attribute`).


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511230#comment-16511230
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195107887
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -71,20 +108,192 @@ public void testSerializationFailureHandling() throws 
Exception {
 
@Test
public void testSuccessfulJobSubmission() throws Exception {
-   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   DispatcherGateway mockGateway = new 
JobGraphCapturingMockGateway();
GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
 
JobSubmitHandler handler = new JobSubmitHandler(

CompletableFuture.completedFuture("http://localhost:1234;),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
-   Collections.emptyMap());
+   Collections.emptyMap(),
+   new Configuration());
 
JobGraph job = new JobGraph("testjob");
JobSubmitRequestBody request = new JobSubmitRequestBody(job);
 
handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway)
.get();
}
+
+   @Test
+   public void testJarHandling() throws Exception {
+   final String jarName = "jar";
+
+   JobGraphCapturingMockGateway jobGraphCapturingMockGateway = new 
JobGraphCapturingMockGateway();
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT,
+   Collections.emptyMap(),
+   new Configuration());
+
+   Path tmp = TMP_FOLDER.newFolder().toPath();
+   Path clientStorageDirectory = 
Files.createDirectory(tmp.resolve("client-storage-directory"));
+   Path serverStorageDirectory = 
Files.createDirectory(tmp.resolve("server-storage-directory"));
+
+   Path jar = Paths.get(jarName);
+   Files.createFile(clientStorageDirectory.resolve(jar));
+   Files.createFile(serverStorageDirectory.resolve(jar));
+
+   JobGraph job = new JobGraph("testjob");
+   job.addJar(new org.apache.flink.core.fs.Path(jar.toUri()));
+   JobSubmitRequestBody serializedJobGraphBody = new 
JobSubmitRequestBody(job);
+   JobSubmitRequestBody request = new 
JobSubmitRequestBody(serializedJobGraphBody.serializedJobGraph, 
Collections.singletonList(serverStorageDirectory.resolve(jar)), 
Collections.emptyList(), serverStorageDirectory);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), jobGraphCapturingMockGateway)
+   .get();
+
+   JobGraph submittedJobGraph = 
jobGraphCapturingMockGateway.jobGraph;
+   List userJars = 
submittedJobGraph.getUserJars();
+
+   // ensure we haven't changed the total number of jars
+   Assert.assertEquals(1, userJars.size());
+
+   // this entry should be changed, a replacement jar exists in 
the server storage directory
+   Assert.assertEquals(new 
org.apache.flink.core.fs.Path(serverStorageDirectory.resolve(jar).toUri()), 
userJars.get(0));
--- End diff --

I think updating `JobGraph#userJars` and `JobGraph#userArtifacts` is not 
really necessary. Maybe we should even mark them `transient` in order to 
emphasize that they won't be transmitted. Given that, I think we don't have to 
do these tests.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511228#comment-16511228
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195090429
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -45,14 +50,41 @@
@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
public final byte[] serializedJobGraph;
 
+   private final java.nio.file.Path storageDirectory;
--- End diff --

This could also be a `File` since it must be local, right?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511223#comment-16511223
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195088315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
+   } catch (IOException ioe) {
+   log.error("Could not upload job jar files.", 
ioe);
+   throw new CompletionException(new 
RestHandlerException("Could not upload job jar files.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   }
+
+   for (PermanentBlobKey key : keys) {
+   jobGraph.addUserJarBlobKey(key);
+   }
+
+   return jobGraph;
+   });
+
+   CompletableFuture submissionFuture = 
jobGraphFuture
+   .thenCompose(finalizedJobGraph -> 
gateway.submitJob(jobGraph, timeout))
.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+
+   CompletableFuture submissionCleanupFuture = 
submissionFuture.thenRun(requestBody::cleanup);
+
+   return submissionFuture.thenCombine(submissionCleanupFuture, 
(responseBody, ignored) -> responseBody);
+   }
+
+   /**
+* Updates the jar entries in the given JobGraph to refer to the 
uploaded jar files instead of client-local files.
+*/
+   private static void updateJarEntriesInJobGraph(JobGraph jobGraph, 
Collection uploadedJars, Logger log) {
--- End diff --

Why do we need to update the jar file names in the `JobGraph`? I thought 
`JobGraph#userJars` is only used by the client to learn which jars to upload to 
the cluster.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511224#comment-16511224
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195087298
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
+   } catch (IOException ioe) {
+   log.error("Could not upload job jar files.", 
ioe);
+   throw new CompletionException(new 
RestHandlerException("Could not upload job jar files.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   }
+
+   for (PermanentBlobKey key : keys) {
+   jobGraph.addUserJarBlobKey(key);
+   }
+
+   return jobGraph;
+   });
+
+   CompletableFuture submissionFuture = 
jobGraphFuture
+   .thenCompose(finalizedJobGraph -> 
gateway.submitJob(jobGraph, timeout))
.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+
+   CompletableFuture submissionCleanupFuture = 
submissionFuture.thenRun(requestBody::cleanup);
+
+   return submissionFuture.thenCombine(submissionCleanupFuture, 
(responseBody, ignored) -> responseBody);
+   }
+
+   /**
+* Updates the jar entries in the given JobGraph to refer to the 
uploaded jar files instead of client-local files.
+*/
+   private static void updateJarEntriesInJobGraph(JobGraph jobGraph, 
Collection uploadedJars, Logger log) {
+   // the existing entries still reference client-local jars
+   jobGraph.getUserJars().clear();
+   for (Path jar : uploadedJars) {
+   log.debug("Adding jar {} to JobGraph({}).", jar, 
jobGraph.getJobID());
+   jobGraph.addJar(new 
org.apache.flink.core.fs.Path(jar.toUri()));
+   }
+   }
+
+   /**
+* Updates the user-artifact entries in the given JobGraph to refer to 
the uploaded artifacts instead of client-local artifacts.
+*/
+   private static void updateUserArtifactEntriesInJobGraph(JobGraph 
jobGraph, Collection uploadedArtifacts, Logger log) {
+   // match the names of uploaded files to the names stored in the 
distributed cache entries to find entries we have to override
+
+   // create a new map from file name -> distributed cache map 
entry
+   Map> remappedArtifactEntries = 
jobGraph.getUserArtifacts().entrySet().stream()
+   .collect(Collectors.toMap(
+   entry -> new 
org.apache.flink.core.fs.Path(entry.getValue().filePath).getName(),
+   entry -> Tuple2.of(entry.getKey(), 
entry.getValue())
+   ));
+   // create a new map from file name -> local file
+   Map mappedUploadedArtifacts = 
uploadedArtifacts.stream()
+   .collect(Collectors.toMap(
+   artifact -> new 
org.apache.flink.core.fs.Path(artifact.toUri()).getName(),
+   artifact -> artifact
+   ));
+
+   if (!remappedArtifactEntries.isEmpty() && 
!mappedUploadedArtifacts.isEmpty()) {
+   jobGraph.getUserArtifacts().clear();
+   for (Map.Entry> entry : 
remappedArtifactEntries.entrySet()) {
+   String fileName = entry.getKey();
+   String dcEntryName = entry.getValue().f0;
+   DistributedCache.DistributedCacheEntry dcEntry 
= entry.getValue().f1;
+
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511226#comment-16511226
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195089124
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
--- End diff --

Instead of calling `updateUserArtifactEntriesInJobGraph` and then 
`jobGraph.uploadUserArtifacts` we could simply take 
`requestBody.getUploadedArtifacts` upload them to the `BlobServer` and add the 
blob keys to the `JobGraph` such that it knows where to retrieve the user 
artifacts from.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511222#comment-16511222
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195089388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
+   } catch (IOException ioe) {
+   log.error("Could not upload job jar files.", 
ioe);
+   throw new CompletionException(new 
RestHandlerException("Could not upload job jar files.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
--- End diff --

we could add the cause `ioe` to the `RestHandlerException`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511233#comment-16511233
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195089986
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -28,35 +33,51 @@
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.util.ScalaUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import akka.actor.AddressFromURIString;
+import org.slf4j.Logger;
+
 import javax.annotation.Nonnull;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler {
 
+   private final Configuration config;
+
public JobSubmitHandler(
CompletableFuture localRestAddress,
GatewayRetriever 
leaderRetriever,
Time timeout,
-   Map headers) {
+   Map headers,
+   Configuration config) {
super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+   this.config = config;
}
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   final JobSubmitRequestBody requestBody = 
request.getRequestBody();
JobGraph jobGraph;
-   try {
-   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   try (ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(requestBody.serializedJobGraph))) {
--- End diff --

Should we send the `serializedJobGraph` also as part of the post request 
body instead of passing it through jackson to encode it in base64.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511225#comment-16511225
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195087491
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -65,7 +86,96 @@ public JobSubmitHandler(
e);
}
 
-   return gateway.submitJob(jobGraph, timeout)
+   updateJarEntriesInJobGraph(jobGraph, 
requestBody.getUploadedJars(), log);
+   updateUserArtifactEntriesInJobGraph(jobGraph, 
requestBody.getUploadedArtifacts(), log);
+
+   CompletableFuture blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+   CompletableFuture jobGraphFuture = 
blobServerPortFuture.thenApply(blobServerPort -> {
+   final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
+   final List keys;
+   try {
+   keys = BlobClient.uploadFiles(address, config, 
jobGraph.getJobID(), jobGraph.getUserJars());
+   jobGraph.uploadUserArtifacts(address, config);
+   } catch (IOException ioe) {
+   log.error("Could not upload job jar files.", 
ioe);
+   throw new CompletionException(new 
RestHandlerException("Could not upload job jar files.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   }
+
+   for (PermanentBlobKey key : keys) {
+   jobGraph.addUserJarBlobKey(key);
+   }
+
+   return jobGraph;
+   });
+
+   CompletableFuture submissionFuture = 
jobGraphFuture
+   .thenCompose(finalizedJobGraph -> 
gateway.submitJob(jobGraph, timeout))
.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+
+   CompletableFuture submissionCleanupFuture = 
submissionFuture.thenRun(requestBody::cleanup);
+
+   return submissionFuture.thenCombine(submissionCleanupFuture, 
(responseBody, ignored) -> responseBody);
+   }
+
+   /**
+* Updates the jar entries in the given JobGraph to refer to the 
uploaded jar files instead of client-local files.
+*/
+   private static void updateJarEntriesInJobGraph(JobGraph jobGraph, 
Collection uploadedJars, Logger log) {
+   // the existing entries still reference client-local jars
+   jobGraph.getUserJars().clear();
+   for (Path jar : uploadedJars) {
+   log.debug("Adding jar {} to JobGraph({}).", jar, 
jobGraph.getJobID());
+   jobGraph.addJar(new 
org.apache.flink.core.fs.Path(jar.toUri()));
+   }
+   }
+
+   /**
+* Updates the user-artifact entries in the given JobGraph to refer to 
the uploaded artifacts instead of client-local artifacts.
+*/
+   private static void updateUserArtifactEntriesInJobGraph(JobGraph 
jobGraph, Collection uploadedArtifacts, Logger log) {
+   // match the names of uploaded files to the names stored in the 
distributed cache entries to find entries we have to override
+
+   // create a new map from file name -> distributed cache map 
entry
+   Map> remappedArtifactEntries = 
jobGraph.getUserArtifacts().entrySet().stream()
+   .collect(Collectors.toMap(
+   entry -> new 
org.apache.flink.core.fs.Path(entry.getValue().filePath).getName(),
+   entry -> Tuple2.of(entry.getKey(), 
entry.getValue())
+   ));
+   // create a new map from file name -> local file
+   Map mappedUploadedArtifacts = 
uploadedArtifacts.stream()
+   .collect(Collectors.toMap(
+   artifact -> new 
org.apache.flink.core.fs.Path(artifact.toUri()).getName(),
+   artifact -> artifact
+   ));
+
+   if (!remappedArtifactEntries.isEmpty() && 
!mappedUploadedArtifacts.isEmpty()) {
+   jobGraph.getUserArtifacts().clear();
+   for (Map.Entry> entry : 
remappedArtifactEntries.entrySet()) {
+   String fileName = entry.getKey();
+   String dcEntryName = entry.getValue().f0;
+   DistributedCache.DistributedCacheEntry dcEntry 
= entry.getValue().f1;
+
+   

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511231#comment-16511231
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195101806
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -91,19 +111,49 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
 
while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
-   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
-   final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
-   checkState(fileUpload.isCompleted());
-
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
-   fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   if 
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
 {
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+   LOG.trace("Received job-submit 
file upload. attribute:{} fileName:{}.", fileUpload.getName(), 
fileUpload.getFilename());
+
+   Path dest;
+   if 
(data.getName().startsWith(HTTP_ATTRIBUTE_JARS)) {
+   dest = 
currentJobSubmitRequestBuffer.getJarDir().resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   
currentJobSubmitRequestBuffer.addJar(fileUpload.getFile().toPath());
+   } else if 
(data.getName().startsWith(HTTP_ATTRIBUTE_ARTIFACTS)) {
+   dest = 
currentJobSubmitRequestBuffer.getArtifactDir().resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   
currentJobSubmitRequestBuffer.addUserArtifact(fileUpload.getFile().toPath());
+   } else {
+   LOG.warn("Received 
unexpected FileUpload that will be ignored. attribute:{} fileName:{}.", 
data.getName(), fileUpload.getFilename());
+   fileUpload.delete();
+   }
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   final byte[] requestJson = 
request.get();
+   JobSubmitRequestBody 
jobSubmitRequestBody = 
RestMapperUtils.getStrictObjectMapper().readValue(requestJson, 
JobSubmitHeaders.getInstance().getRequestClass());
+   
currentJobSubmitRequestBuffer.setJobGraph(jobSubmitRequestBody.serializedJobGraph);
+   }
+   } else {
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
+   "_" + 
fileUpload.getFilename()));
+   
fileUpload.renameTo(dest.toFile());
+   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   }
}

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511232#comment-16511232
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195099673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -91,19 +111,49 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
 
while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
-   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
-   final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
-   checkState(fileUpload.isCompleted());
-
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
-   fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   if 
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
 {
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+   LOG.trace("Received job-submit 
file upload. attribute:{} fileName:{}.", fileUpload.getName(), 
fileUpload.getFilename());
+
+   Path dest;
+   if 
(data.getName().startsWith(HTTP_ATTRIBUTE_JARS)) {
+   dest = 
currentJobSubmitRequestBuffer.getJarDir().resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   
currentJobSubmitRequestBuffer.addJar(fileUpload.getFile().toPath());
+   } else if 
(data.getName().startsWith(HTTP_ATTRIBUTE_ARTIFACTS)) {
+   dest = 
currentJobSubmitRequestBuffer.getArtifactDir().resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   
currentJobSubmitRequestBuffer.addUserArtifact(fileUpload.getFile().toPath());
+   } else {
+   LOG.warn("Received 
unexpected FileUpload that will be ignored. attribute:{} fileName:{}.", 
data.getName(), fileUpload.getFilename());
+   fileUpload.delete();
+   }
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   final byte[] requestJson = 
request.get();
+   JobSubmitRequestBody 
jobSubmitRequestBody = 
RestMapperUtils.getStrictObjectMapper().readValue(requestJson, 
JobSubmitHeaders.getInstance().getRequestClass());
+   
currentJobSubmitRequestBuffer.setJobGraph(jobSubmitRequestBody.serializedJobGraph);
+   }
--- End diff --

I think we are mixing here a lot of handler specific knowledge into this 
handler and thereby creating a very strong coupling between multiple 
components. Moreover, this handler seems to deserialize json which is rather 
the responsibility of the `AbstractHandler`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511227#comment-16511227
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195091137
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -45,14 +50,41 @@
@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
public final byte[] serializedJobGraph;
--- End diff --

I think it would be a good idea to send the `JobGraph` as binary data 
instead of encoding it in Base64 to send it as part of the json request.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511219#comment-16511219
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195107099
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -206,4 +200,49 @@ public void testDirectoryCleanUp() throws Exception {
}
}
}
+
+   @Test
+   public void testCompression() throws IOException {
+   final java.nio.file.Path compressDir = 
temporaryFolder.newFolder("compressDir").toPath();
+   final java.nio.file.Path extractDir = 
temporaryFolder.newFolder("extractDir").toPath();
+
+   final java.nio.file.Path originalDir = Paths.get("rootDir");
+   final java.nio.file.Path emptySubDir = 
originalDir.resolve("emptyDir");
+   final java.nio.file.Path fullSubDir = 
originalDir.resolve("fullDir");
+   final java.nio.file.Path file1 = originalDir.resolve("file1");
+   final java.nio.file.Path file2 = originalDir.resolve("file2");
+   final java.nio.file.Path file3 = fullSubDir.resolve("file3");
+
+   Files.createDirectory(compressDir.resolve(originalDir));
+   Files.createDirectory(compressDir.resolve(emptySubDir));
+   Files.createDirectory(compressDir.resolve(fullSubDir));
+   Files.copy(new 
ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), 
compressDir.resolve(file1));
+   Files.createFile(compressDir.resolve(file2));
+   Files.copy(new 
ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), 
compressDir.resolve(file3));
+
+   final Path zip = FileCache.compressDirectory(new 
Path(compressDir.resolve(originalDir).toString()));
+
+   FileCache.expandDirectory(new File(zip.getPath()), 
extractDir.toFile(), false);
+
+   assertTrue(Files.exists(extractDir.resolve(originalDir)));
--- End diff --

I'll try to come up with something. The linked solution does not cover 
empty directories and does not detect additional files in one direction (and 
I'd prefer if we didn't have to run it twice with reverse arguments).


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511150#comment-16511150
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195083682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -89,7 +89,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) msg;
currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

The `JobSubmitHandlerTest` never transmits any files via netty. The failure 
would occur in the `FileUploadHandler` which is currently completely untested.

I'll try to find a test that can reproduce this


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511135#comment-16511135
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195080163
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
 ---
@@ -25,4 +25,6 @@
  */
 public class RestConstants {
--- End diff --

yes, goot point


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511134#comment-16511134
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195080145
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -581,6 +582,8 @@ public String toString() {
}
 
public void uploadUserArtifacts(InetSocketAddress blobServerAddress, 
Configuration clientConfig) throws IOException {
+   zipUserArtifacts();
--- End diff --

I think it would be better to change these parts instead of pushing this 
logic into the `JobGraph`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511130#comment-16511130
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195079921
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -89,7 +89,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) msg;
currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

Could you create a test case which reproduces the problem? Running the 
`JobSubmitHandlerTest` without this change did not reproduce the problem. I 
would like to see whether it is indeed a Netty bug or if we are simply doing 
something wrong which we cover up with this fix.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511128#comment-16511128
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195079190
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -273,26 +269,8 @@ public Path call() throws IOException {
final File file = blobService.getFile(jobID, blobKey);
--- End diff --

Then I would suggest to update the JavaDoc stating that only zipped 
directories are copied.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511126#comment-16511126
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195078844
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -253,13 +249,13 @@ public void releaseJob(JobID jobId, 
ExecutionAttemptID executionId) {
private static class CopyFromBlobProcess implements Callable {
 
private final PermanentBlobKey blobKey;
-   private final Path target;
+   private final File target;
private final boolean isDirectory;
private final boolean isExecutable;
private final JobID jobID;
private final PermanentBlobService blobService;
 
-   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) throws Exception {
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, File target) throws Exception {
this.isExecutable = e.isExecutable;
this.isDirectory = e.isZipped;
--- End diff --

Sounds good


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1655#comment-1655
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195067790
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -313,38 +305,32 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
-
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
-
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
-   }
-
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
-
+   CompletableFuture submissionFuture = 
getWebMonitorBaseUrl()
+   .thenCompose(webMonitorBaseUrl -> {
try {
-   return sendRequest(
+   jobGraph.zipUserArtifacts();
+
+   Collection localUserArtifacts = 
jobGraph.getUserArtifacts().values().stream()
+   .map(entry -> new 
Path(entry.filePath))
+   .filter(path -> {
+   try {
+   return 
!path.getFileSystem().isDistributedFS();
+   } catch (Exception e) {
+   log.warn("Could 
not determine whether {} is a local file. The file may not be accessible via 
the Distributed Cache.", path, e);
+   // filesystem 
isn't accessible from the client or FS class not present
+   return false;
+   }
+   })
+   .collect(Collectors.toList());
+
+   return restClient.sendRequest(
--- End diff --

Let's add retries for this call by adding a `sendRetriableRequest(...)` 
with correct signature. Even better would be to add a `sendRequest(...)` with 
the correct signature which dispatches to `sendRetriableRequest`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511124#comment-16511124
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195073725
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(request);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+
+   LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
+   // serialize payload
+   StringWriter sw = new StringWriter();
+   objectMapper.writeValue(sw, request);
+   ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+   // do not load file into memory, this can have weird 
side-effects and break functionality
+   HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+   HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
+   httpRequest.headers()
+   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
+   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+   // takes care of splitting the request into multiple parts
+   HttpPostRequestEncoder bodyRequestEncoder;
+   try {
+   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
+
+   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+   requestAttribute.setContent(payload);
+   bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
+   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
+
+   bodyRequestEncoder.finalizeRequest();
+   } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
+   return 
org.apache.flink.runtime.concurrent.FutureUtils.completedExceptionally(e);
--- End diff --

nit: Could import `FutureUtils`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1658#comment-1658
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195075668
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(request);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+
+   LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
+   // serialize payload
+   StringWriter sw = new StringWriter();
+   objectMapper.writeValue(sw, request);
+   ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+   // do not load file into memory, this can have weird 
side-effects and break functionality
+   HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+   HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
+   httpRequest.headers()
+   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
+   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+   // takes care of splitting the request into multiple parts
+   HttpPostRequestEncoder bodyRequestEncoder;
+   try {
+   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
+
+   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+   requestAttribute.setContent(payload);
+   bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
+   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
+
+   bodyRequestEncoder.finalizeRequest();
+   } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
+   return 
org.apache.flink.runtime.concurrent.FutureUtils.completedExceptionally(e);
+   }
+
+   return createChannelFuture(targetAddress, targetPort)
+   .thenComposeAsync(
+   channel -> {
+   ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
+   CompletableFuture future 
= handler.getJsonFuture();
+
+   channel.writeAndFlush(httpRequest);
+   // if this is false the jars/artifacts 
are so small that they were already included in the initial request
+   if (bodyRequestEncoder.isChunked()) {
+   
channel.writeAndFlush(bodyRequestEncoder);
+   }
+
+   // release data and remove temporary 
files if they were created
+   bodyRequestEncoder.cleanFiles();
+
+   return future;
+   },
+   executor)
+   .thenComposeAsync(
+   (JsonResponse 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1659#comment-1659
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195076266
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
--- End diff --

This method and the other `sendRequest` method contain a lot of duplicate 
code. Can we simplify this by having a `sendRequestInternal` method which takes 
a `RequestBodyProvider` which can be for `POST`, `PUT` and `OPTIONS` be a 
`HttpPostRequestEncoder` and for all other verbs a `VoidBodyProvider`?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511122#comment-16511122
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195075210
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(request);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+
+   LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
+   // serialize payload
+   StringWriter sw = new StringWriter();
+   objectMapper.writeValue(sw, request);
+   ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+   // do not load file into memory, this can have weird 
side-effects and break functionality
+   HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+   HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
+   httpRequest.headers()
+   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
+   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+   // takes care of splitting the request into multiple parts
+   HttpPostRequestEncoder bodyRequestEncoder;
+   try {
+   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
--- End diff --

Must it be strictly a multi-part request?


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1656#comment-1656
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195068264
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
 ---
@@ -25,4 +25,6 @@
  */
 public class RestConstants {
--- End diff --

This could be an `enum`. That way we would get all the nice singleton 
properties for free.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511123#comment-16511123
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195076996
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
--- End diff --

It would be great if the `RestClient` did not know about the distinction 
between `jars` and `userArtifacts`. Instead it should be enough to provide this 
method a collection of `FileUpload` objects which contain the path and some 
meta information to make sense of the different files contained in the body on 
the receiving side.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1657#comment-1657
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195075314
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(request);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+
+   LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
+   // serialize payload
+   StringWriter sw = new StringWriter();
+   objectMapper.writeValue(sw, request);
+   ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+   // do not load file into memory, this can have weird 
side-effects and break functionality
+   HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+   HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
+   httpRequest.headers()
+   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
+   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+   // takes care of splitting the request into multiple parts
+   HttpPostRequestEncoder bodyRequestEncoder;
+   try {
+   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
+
+   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+   requestAttribute.setContent(payload);
+   bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
+   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
+
+   bodyRequestEncoder.finalizeRequest();
--- End diff --

If it's not a multi-part request, then we should send the `HttpRequest` 
which is returned here.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511121#comment-16511121
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195077673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
 
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection jars,
+   Collection userArtifacts) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(request);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+
+   LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
+   // serialize payload
+   StringWriter sw = new StringWriter();
+   objectMapper.writeValue(sw, request);
+   ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+   // do not load file into memory, this can have weird 
side-effects and break functionality
+   HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+   HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
+   httpRequest.headers()
+   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
+   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+   // takes care of splitting the request into multiple parts
+   HttpPostRequestEncoder bodyRequestEncoder;
+   try {
+   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
+
+   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+   requestAttribute.setContent(payload);
+   bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
+   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
--- End diff --

If we can send arbitrary files to the server and let the respective handler 
make sense of what is in what file, then we would also not need to introduce 
the different attributes.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511120#comment-16511120
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195067963
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -313,38 +305,32 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
-
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
-
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
-   }
-
-   return jobGraph;
-   });
-
-   CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
-   (JobGraph jobGraphToSubmit) -> {
-   log.info("Submitting job graph.");
-
+   CompletableFuture submissionFuture = 
getWebMonitorBaseUrl()
+   .thenCompose(webMonitorBaseUrl -> {
try {
-   return sendRequest(
+   jobGraph.zipUserArtifacts();
--- End diff --

I think zipping should not be the responsibility of the `JobGraph`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511092#comment-16511092
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195070782
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -581,6 +582,8 @@ public String toString() {
}
 
public void uploadUserArtifacts(InetSocketAddress blobServerAddress, 
Configuration clientConfig) throws IOException {
--- End diff --

This also applies to `uploadUserJars`, yes?




> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511083#comment-16511083
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195069312
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -253,13 +249,13 @@ public void releaseJob(JobID jobId, 
ExecutionAttemptID executionId) {
private static class CopyFromBlobProcess implements Callable {
 
private final PermanentBlobKey blobKey;
-   private final Path target;
+   private final File target;
private final boolean isDirectory;
private final boolean isExecutable;
private final JobID jobID;
private final PermanentBlobService blobService;
 
-   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) throws Exception {
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, File target) throws Exception {
this.isExecutable = e.isExecutable;
this.isDirectory = e.isZipped;
--- End diff --

How about `isZippedDirectory`? `isDirectory` is misleading since the file 
is not actually a directory.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511073#comment-16511073
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195068231
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -273,26 +269,8 @@ public Path call() throws IOException {
final File file = blobService.getFile(jobID, blobKey);
--- End diff --

It does so partially, we now no longer delete the zip retrieved from the 
blobservice.

We _could_ fix the ownership problem by copying simple files as well to the 
storage-directory, but that seems wasteful.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511064#comment-16511064
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195065961
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 ---
@@ -40,6 +41,14 @@
 @Public
 public class DistributedCache {
 
+   /**
+* An entry for a single file or directory that should be cached.
+*
+* Entries have different semantics for local directories depending 
on where we are in the job-submission process.
+* After registration through the API {@code filePath} denotes the 
original directory.
+* Before the job is submitted to the cluster directories are zipped, 
at which point {@code filePath} denotes the path to the local zip.
+* After the upload to the cluster, {@code filePath} denotes the 
(server-side) copy of the zip.
+*/
public static class DistributedCacheEntry implements Serializable {
--- End diff --

Actually, you just added JavaDocs stating all the different purposes of 
this class. I really think that we should split this class up into dedicated 
classes.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511065#comment-16511065
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195066210
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 ---
@@ -40,6 +41,14 @@
 @Public
 public class DistributedCache {
 
+   /**
+* An entry for a single file or directory that should be cached.
+*
+* Entries have different semantics for local directories depending 
on where we are in the job-submission process.
+* After registration through the API {@code filePath} denotes the 
original directory.
+* Before the job is submitted to the cluster directories are zipped, 
at which point {@code filePath} denotes the path to the local zip.
+* After the upload to the cluster, {@code filePath} denotes the 
(server-side) copy of the zip.
+*/
public static class DistributedCacheEntry implements Serializable {
--- End diff --

As a neat side effect, we could also refactor how this information is sent 
to the cluster, namely changing it such that it is no longer serialized into 
the `Configuration`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511066#comment-16511066
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195065351
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 ---
@@ -40,6 +41,14 @@
 @Public
 public class DistributedCache {
 
+   /**
+* An entry for a single file or directory that should be cached.
+*
+* Entries have different semantics for local directories depending 
on where we are in the job-submission process.
+* After registration through the API {@code filePath} denotes the 
original directory.
+* Before the job is submitted to the cluster directories are zipped, 
at which point {@code filePath} denotes the path to the local zip.
+* After the upload to the cluster, {@code filePath} denotes the 
(server-side) copy of the zip.
+*/
public static class DistributedCacheEntry implements Serializable {
--- End diff --

It might be out of scope of this PR but I think the `DistributedCacheEntry` 
mixes too many responsibilities. On the one hand it is used to transport cache 
entry information like `isZipped`, `blobKey` and `isExecutable` which is only 
relevant for the job submission. On the other hand, it also contains 
information about which files to transmit to the cluster at the job creation 
time. I think it would be a good idea to separate these responsibilities. As a 
side effect, we would not have `nullable` fields such as the `blobKey` in this 
class.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511067#comment-16511067
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195066482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -581,6 +582,8 @@ public String toString() {
}
 
public void uploadUserArtifacts(InetSocketAddress blobServerAddress, 
Configuration clientConfig) throws IOException {
+   zipUserArtifacts();
--- End diff --

I don't know that part of the code, my guess was that we want to keep the 
transition from `Plan -> JobGraph` straight-forward.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511059#comment-16511059
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6147#discussion_r195065850
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -89,7 +89,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) msg;
currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

The failure was triggered every-time i attempted to upload anything, but i 
may very well depend on the payload size.

Your analysis is mostly correct, what's missing is that decoder switches 
into the `EPILOGUE` state when being offered a `LastHttpContent`.
If this last message is not empty the exception is not thrown since data is 
still available, checked in `hasNext` via `this.bodyListHttpDataRank >= 
this.bodyListHttpData.size()`.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >