[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4742


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r143160231
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -216,4 +219,25 @@ protected void finalizeCluster() {
public int getMaxSlots() {
return 0;
}
+
+   private static final class RestClusterClientThreadFactory implements 
ThreadFactory {
--- End diff --

fixed


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r143158921
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -216,4 +219,25 @@ protected void finalizeCluster() {
public int getMaxSlots() {
return 0;
}
+
+   private static final class RestClusterClientThreadFactory implements 
ThreadFactory {
--- End diff --

I think you could also use the `ExecutorThreadFactory` for this purpose.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142943570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

we could also add a dedicated resolve method for each parameter that a 
particular class defines, but at that point we're duplicating the individual 
parameter methods.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142933167
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

I know the the HttpObjectAggregator will throw an exception, and i _think_ 
it will just be logged server-side. We can't really change that except my 
modifying the aggregator (but i have some WIP to replace it anyway).

I'll add the client-side size check.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142930472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

well they aren't really internal fields, at least i didn't intend them to 
be. The client has to resolve the parameters somehow, so we either have to add 
a custom resolve method to every `MessageParameters` class (which will make for 
an odd API when creating sub-classes), or provide access to each parameter 
(either directly or through a getter). I opted for the direct approach since it 
makes it obvious that we are in fact modifying the `MessageParameters` object.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142929069
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
--- End diff --

renamed to testJobSubmitCancelStop. IT verifies that the cliennt sends 
properly sends out requests to the corresponding handlers.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142899120
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+   private final RestClient restClient;
+   private final ExecutorService executorService = 
Executors.newFixedThreadPool(4);
--- End diff --

Can we give these threads a proper name? Something like 
"RestClusterClientIOThread".


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142899387
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A configuration object for {@link RestClusterClient}s.
+ */
+public final class RestClusterClientConfiguration {
+
+   private final String blobServerAddress;
+
+   private final RestClientConfiguration restEndpointConfiguration;
--- End diff --

rename to `restClientConfiguration`?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142897184
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -610,25 +633,15 @@ public void cancel(JobID jobId) throws Exception {
 * failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
 */
public void stop(final JobID jobId) throws Exception {
-   final ActorGateway jobManagerGateway = getJobManagerGateway();
+   final ActorGateway jobManager = getJobManagerGateway();
 
-   final Future response;
-   try {
-   response = jobManagerGateway.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
-   } catch (final Exception e) {
-   throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
-   }
+   Future response = jobManager.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
 
-   final Object result = Await.result(response, timeout);
+   final Object rc = Await.result(response, timeout);
 
-   if (result instanceof JobManagerMessages.StoppingSuccess) {
-   log.info("Job stopping with ID " + jobId + " 
succeeded.");
-   } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
-   final Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
-   log.info("Job stopping with ID " + jobId + " failed.", 
t);
-   throw new Exception("Failed to stop the job because of 
\n" + t.getMessage());
-   } else {
-   throw new Exception("Unknown message received while 
stopping: " + result.getClass().getName());
+   if (rc instanceof JobManagerMessages.StoppingFailure) {
+   throw new Exception("Stopping the job with ID " + jobId 
+ " failed.",
+   ((JobManagerMessages.StoppingFailure) 
rc).cause());
--- End diff --

The unknown response type exception was lost


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142900393
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+ 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142902251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   return FutureUtils.completedExceptionally(new 
RestHandlerException(
--- End diff --

The `RestHandlerException` could be thrown right away without wrapping it 
into a future.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142907890
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

Alright, will a proper exception be thrown when his limit is exceeded? 
Otherwise we could check right away when creating this request.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142900355
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+ 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142907629
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

Hmm what about giving these values to the `JobTerminationMessageParameters` 
constructor? It feels not entirely right that we access internal fields to 
resolve them?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142902303
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
--- End diff --

True.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142897388
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+   private final RestClient restClient;
+   private final ExecutorService executorService = 
Executors.newFixedThreadPool(4);
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.restClusterClientConfiguration = configuration;
+   this.restClient = new 
RestClient(configuration.getRestEndpointConfiguration(), executorService);
+   }
+
+   @Override
+   public void shutdown() {
+   try {
+   // we only call this for legacy reasons to shutdown 
components that are started in the ClusterClient constructor
+   super.shutdown();
+   } catch (Exception e) {
+   log.error("An error occurred during the client 
shutdown.", e);
+   }
+   this.restClient.shutdown(Time.seconds(5));
+   this.executorService.shutdown();
--- End diff --

Better to use `Executors.gracefulShutdown` here.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142900022
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+   private final RestClient restClient;
+   private final ExecutorService executorService = 
Executors.newFixedThreadPool(4);
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.restClusterClientConfiguration = configuration;
+   this.restClient = new 
RestClient(configuration.getRestEndpointConfiguration(), executorService);
+   }
+
+   @Override
+   public void shutdown() {
+   try {
+   // we only call this for legacy reasons to shutdown 
components that are started in the ClusterClient constructor
+   super.shutdown();
+   } catch (Exception e) {
+   log.error("An error occurred during the client 
shutdown.", e);
+   }
+   this.restClient.shutdown(Time.seconds(5));
+   this.executorService.shutdown();
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6 since slot-sharing isn't 
implemented yet
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142899909
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
--- End diff --

What does test `ABC` test?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142625774
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
+   try {
+   return new RestClusterClient(config);
+   } catch (Exception e) {
+   throw new RuntimeException("Couldn't retrieve 
standalone cluster", e);
--- End diff --

I'd keep it like this to be consistent with all other implementations. I do 
think the signature should have a proper checked exception, but I wouldn't do 
it in this PR tbh.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142624817
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -554,6 +557,18 @@ protected int stop(String[] args) {
return handleArgException(new CliArgsException("Missing 
JobID"));
}
 
+   // FLIP-6 specific branch
+   try {
+   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+   if (activeCommandLine instanceof  Flip6DefaultCLI) {
+   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+   client.stop(jobId);
--- End diff --

it appears so.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142622511
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142605823
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -554,6 +557,18 @@ protected int stop(String[] args) {
return handleArgException(new CliArgsException("Missing 
JobID"));
}
 
+   // FLIP-6 specific branch
+   try {
+   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+   if (activeCommandLine instanceof  Flip6DefaultCLI) {
+   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+   client.stop(jobId);
--- End diff --

So this should be possible to change, right?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142605564
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142605663
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

Agreed.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142605279
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
+
+   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+   this(serializeJobGraph(jobGraph));
+   }
+
+   @JsonCreator
+   public JobSubmitRequestBody(
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
+
+   this.serializedJobGraph = serializedJobGraph;
+   }
+
+   @Override
+   public int hashCode() {
+   return 71 * Arrays.hashCode(this.serializedJobGraph);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object instanceof JobSubmitRequestBody) {
+   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
+   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
+   }
+   return false;
+   }
+
+   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
--- End diff --

Testwise I serialized the `WindowJoin` JobGraph and the size was `103787` 
bytes. Thus I would suggest to set it to 64KB maybe?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142604070
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142513408
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

overall it would make more sense to properly refactor the ClusterClient 
into an interface with 2 implementations. But that could be a next step; for 
now I'll the call to super.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142512180
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

well. unfortunately these things are started in the ClusterClient 
constructor, so we do have to call super, or refactor the client to only 
instantiate those when needed.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142508400
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
--- End diff --

will modify the restClusterClient to shutdown the executor service.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142507812
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -554,6 +557,18 @@ protected int stop(String[] args) {
return handleArgException(new CliArgsException("Missing 
JobID"));
}
 
+   // FLIP-6 specific branch
+   try {
+   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+   if (activeCommandLine instanceof  Flip6DefaultCLI) {
+   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+   client.stop(jobId);
--- End diff --

The existing client doesn't use the ClusterClient to issue stop/cancel but 
goes directly through the actor system. While the ClusterClient DOES have a 
stop/cancel method, it is actually never called (god knows why...).


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142507072
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
--- End diff --

in the case of FLIP-6 that is


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142506368
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142504388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to retrieve the port that the blob server runs 
on.
+ */
+public final class BlobServerPortHandler extends 
AbstractRestHandler {
+
+   public BlobServerPortHandler(CompletableFuture 
localRestAddress, GatewayRetriever leaderRetriever, Time 
timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
BlobServerPortHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+   return gateway
+   .getBlobServerPort(Time.seconds(5))
+   .handleAsync((Integer port, Throwable error) -> {
+   if (error != null) {
+   log.error("Failed to retrieve blob 
server port.", ExceptionUtils.stripCompletionException(error));
+   return 
FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to retrieve blob server port.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   } else {
+   return 
CompletableFuture.completedFuture(new BlobServerPortResponseBody(port));
+   }
+   })
+   .thenCompose(future -> future);
--- End diff --

Then we would lose the error message. what about
```
return gateway
.getBlobServerPort(timeout)
.thenApply(BlobServerPortResponseBody::new)
.exceptionally(error -> {
throw new CompletionException(new RestHandlerException("Failed 
to retrieve blob server port.", HttpResponseStatus.INTERNAL_SERVER_ERROR));
});
```


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142502599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
--- End diff --

A wrapper as in a class that extends JobGraph and overrides every method? 
This seems a bit tricky to maintain without an interface to implement (granted 
the jobgraph rarely changes, but if it does things would probably blow up).


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142501950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
+   return FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to deserialize JobGraph.", 
HttpResponseStatus.BAD_REQUEST));
--- End diff --

or just handle it locally and properly return.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142501847
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
--- End diff --

why? We can handle the exception here and properly return.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142500763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
+
+   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+   this(serializeJobGraph(jobGraph));
+   }
+
+   @JsonCreator
+   public JobSubmitRequestBody(
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
+
+   this.serializedJobGraph = serializedJobGraph;
+   }
+
+   @Override
+   public int hashCode() {
+   return 71 * Arrays.hashCode(this.serializedJobGraph);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object instanceof JobSubmitRequestBody) {
+   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
+   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
+   }
+   return false;
+   }
+
+   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
--- End diff --

what value would you suggest?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142228578
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
+   try {
+   return new RestClusterClient(config);
+   } catch (Exception e) {
+   throw new RuntimeException("Couldn't retrieve 
standalone cluster", e);
--- End diff --

How about adding a `Flip6ClusterException` to wrap this exception?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142258856
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142258814
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142257641
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

We don't need to call super. The ClusterClient implementation shuts down 
the HA services/actorSystemLoeader, which we don't use.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142255730
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
--- End diff --

it's always null.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142251500
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142207196
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142157044
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest {
+
+   @Test
+   public void testSerializationFailureHandling() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobSubmitRequestBody request = new JobSubmitRequestBody(new 
byte[0]);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway);
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
+   Assert.fail();
+   } catch (ExecutionException ee) {
+   RestHandlerException rhe = (RestHandlerException) 
ee.getCause();
+
+   Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, 
rhe.getHttpResponseStatus());
+   }
+   }
+
+   @Test
+   public void testSuccessfulJobSubmission() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobGraph job = new JobGraph("testjob");
+   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
--- End diff --

we call get() on the returned future, which will fail fit he job submission 
was not successful. But I will move the get on the next line since it can be 
easily missed.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156728
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest {
+   private static final int PORT = 64;
+
+   @Test
+   public void testPortRetrieval() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   BlobServerPortHandler handler = new BlobServerPortHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   BlobServerPortResponseBody portResponse = 
handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance()), mockGateway).get();
+
+   Assert.assertEquals(PORT, portResponse.port);
+   }
+
+   @Test
+   public void testPortRetrievalFailureHandling() throws Exception {
+
+   }
--- End diff --

jup


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

the current max is 10mb.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156568
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

so that you can actually access (and resolve) them in 
`RestClusterClient#stop()`.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142155888
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141870072
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

calling the super method is missing. Or don't we need this?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142102357
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
--- End diff --

Can `applicationID` be `Nullable`?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141870295
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
--- End diff --

Maybe add that this is due to the fact that we don't support scheduling 
with slot sharing with Flip-6 at the moment.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to retrieve the port that the blob server runs 
on.
+ */
+public final class BlobServerPortHandler extends 
AbstractRestHandler {
+
+   public BlobServerPortHandler(CompletableFuture 
localRestAddress, GatewayRetriever leaderRetriever, Time 
timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
BlobServerPortHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+   return gateway
+   .getBlobServerPort(Time.seconds(5))
+   .handleAsync((Integer port, Throwable error) -> {
--- End diff --

should not call `async` variants without providing a thread pool. The 
problem is that these callbacks are executed by the global thread pool.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107255
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
--- End diff --

The logging should happen at a higher level (`AbstractRestHandler`).


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142120909
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
+
+   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+   this(serializeJobGraph(jobGraph));
+   }
+
+   @JsonCreator
+   public JobSubmitRequestBody(
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
+
+   this.serializedJobGraph = serializedJobGraph;
--- End diff --

Check not null.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

Is there a size limitation for what we can send via the REST client/server?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121438
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest {
--- End diff --

`extends TestLogger`.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
+
+   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+   this(serializeJobGraph(jobGraph));
+   }
+
+   @JsonCreator
+   public JobSubmitRequestBody(
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
+
+   this.serializedJobGraph = serializedJobGraph;
+   }
+
+   @Override
+   public int hashCode() {
+   return 71 * Arrays.hashCode(this.serializedJobGraph);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object instanceof JobSubmitRequestBody) {
+   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
+   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
+   }
+   return false;
+   }
+
+   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
--- End diff --

The initial size might be a bit too low, I would assume.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142106967
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to retrieve the port that the blob server runs 
on.
+ */
+public final class BlobServerPortHandler extends 
AbstractRestHandler {
+
+   public BlobServerPortHandler(CompletableFuture 
localRestAddress, GatewayRetriever leaderRetriever, Time 
timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
BlobServerPortHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+   return gateway
+   .getBlobServerPort(Time.seconds(5))
+   .handleAsync((Integer port, Throwable error) -> {
+   if (error != null) {
+   log.error("Failed to retrieve blob 
server port.", ExceptionUtils.stripCompletionException(error));
+   return 
FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to retrieve blob server port.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+   } else {
+   return 
CompletableFuture.completedFuture(new BlobServerPortResponseBody(port));
+   }
+   })
+   .thenCompose(future -> future);
--- End diff --

could this be

```
return 
gateway.getBlobServerPort(timeout).thenApply(BlobServerPortResponseBody::new)
```


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107328
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
+   return FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to deserialize JobGraph.", 
HttpResponseStatus.BAD_REQUEST));
+   }
+
+   gateway.submitJob(jobGraph, Time.seconds(5));
+
+   return CompletableFuture.completedFuture(new 
JobSubmitResponseBody("Submitted job with ID " + jobGraph.getJobID() + "."));
--- End diff --

nit: '.'


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142105206
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
--- End diff --

extends `TestLogger`


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121886
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest {
+
+   @Test
+   public void testSerializationFailureHandling() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobSubmitRequestBody request = new JobSubmitRequestBody(new 
byte[0]);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway);
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
+   Assert.fail();
+   } catch (ExecutionException ee) {
+   RestHandlerException rhe = (RestHandlerException) 
ee.getCause();
+
+   Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, 
rhe.getHttpResponseStatus());
+   }
+   }
+
+   @Test
+   public void testSuccessfulJobSubmission() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobGraph job = new JobGraph("testjob");
+   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
--- End diff --

Should we assert something here?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141868129
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -635,6 +650,18 @@ protected int cancel(String[] args) {
return 1;
}
 
+   // FLIP-6 specific branch
+   try {
+   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+   if (activeCommandLine instanceof Flip6DefaultCLI) {
+   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+   client.cancel(jobId);
--- End diff --

Same here for cancel.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141870518
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
--- End diff --

Can we throw a `ProgramInvocationException` here? If not, then we should 
maybe introduce a new checked exception.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141871653
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141868101
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -554,6 +557,18 @@ protected int stop(String[] args) {
return handleArgException(new CliArgsException("Missing 
JobID"));
}
 
+   // FLIP-6 specific branch
+   try {
+   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+   if (activeCommandLine instanceof  Flip6DefaultCLI) {
+   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+   client.stop(jobId);
--- End diff --

Could we integrate the other stopping behaviour from the old code into the 
`ClusterClient`? Then we would simply retrieve the `ClusterClient` and call 
`stop` without having this special case where we exit early in case of Flip-6?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141869880
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
--- End diff --

Maybe rename field to `restClient`.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142104259
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.util.ConfigurationException;
+
+/**
+ * A configuration object for {@link RestClusterClient}s.
+ */
+public final class RestClusterClientConfiguration {
+
+   private final String blobServerAddress;
+
+   private final RestClientConfiguration restEndpointConfiguration;
+
+   private final String restServerAddress;
+
+   private final int restServerPort;
+
+   private RestClusterClientConfiguration(
+   String blobServerAddress,
+   RestClientConfiguration endpointConfiguration,
+   String restServerAddress,
+   int restServerPort) {
+   this.blobServerAddress = blobServerAddress;
+   this.restEndpointConfiguration = endpointConfiguration;
+   this.restServerAddress = restServerAddress;
+   this.restServerPort = restServerPort;
+   }
+
+   public String getBlobServerAddress() {
+   return blobServerAddress;
+   }
+
+   public String getRestServerAddress() {
+   return restServerAddress;
+   }
+
+   public int getRestServerPort() {
+   return restServerPort;
+   }
+
+   public RestClientConfiguration getRestEndpointConfiguration() {
+   return restEndpointConfiguration;
+   }
+
+   public static RestClusterClientConfiguration 
fromConfiguration(Configuration config) throws ConfigurationException {
+   String address = config.getString(JobManagerOptions.ADDRESS);
--- End diff --

Maybe rename to `blobServerAddress`


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142103754
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.util.ConfigurationException;
+
+/**
+ * A configuration object for {@link RestClusterClient}s.
+ */
+public final class RestClusterClientConfiguration {
+
+   private final String blobServerAddress;
+
+   private final RestClientConfiguration restEndpointConfiguration;
+
+   private final String restServerAddress;
+
+   private final int restServerPort;
+
+   private RestClusterClientConfiguration(
+   String blobServerAddress,
+   RestClientConfiguration endpointConfiguration,
+   String restServerAddress,
+   int restServerPort) {
+   this.blobServerAddress = blobServerAddress;
+   this.restEndpointConfiguration = endpointConfiguration;
+   this.restServerAddress = restServerAddress;
+   this.restServerPort = restServerPort;
+   }
+
+   public String getBlobServerAddress() {
+   return blobServerAddress;
+   }
+
+   public String getRestServerAddress() {
+   return restServerAddress;
+   }
+
+   public int getRestServerPort() {
+   return restServerPort;
+   }
+
+   public RestClientConfiguration getRestEndpointConfiguration() {
+   return restEndpointConfiguration;
+   }
+
+   public static RestClusterClientConfiguration 
fromConfiguration(Configuration config) throws ConfigurationException {
+   String address = config.getString(JobManagerOptions.ADDRESS);
+
+   String serverAddress = 
config.getString(RestOptions.REST_ADDRESS);
+   int serverPort = config.getInteger(RestOptions.REST_PORT);
+
+   RestClientConfiguration endpointConfiguration = 
RestClientConfiguration.fromConfiguration(config);
--- End diff --

Maybe rename to `restClientConfiguration`


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107229
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
+   return FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to deserialize JobGraph.", 
HttpResponseStatus.BAD_REQUEST));
--- End diff --

Here we could throw the `RestHandlerException` right away.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121549
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest {
+   private static final int PORT = 64;
+
+   @Test
+   public void testPortRetrieval() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   BlobServerPortHandler handler = new BlobServerPortHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   BlobServerPortResponseBody portResponse = 
handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance()), mockGateway).get();
+
+   Assert.assertEquals(PORT, portResponse.port);
+   }
+
+   @Test
+   public void testPortRetrievalFailureHandling() throws Exception {
+
+   }
--- End diff --

Forgotten to implement?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141869317
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
--- End diff --

Can we make this final?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142106602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to retrieve the port that the blob server runs 
on.
+ */
+public final class BlobServerPortHandler extends 
AbstractRestHandler {
+
+   public BlobServerPortHandler(CompletableFuture 
localRestAddress, GatewayRetriever leaderRetriever, Time 
timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
BlobServerPortHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+   return gateway
+   .getBlobServerPort(Time.seconds(5))
--- End diff --

we should use the passed in `timeout` variable


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142120781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
--- End diff --

Here we are doing unnecessary work by deserializing the `JobGraph`. It 
would be best if we could simply forward the serialized `JobGraph` to the 
`Dispatcher`. Maybe we could have a wrapper which lazily deserializes the 
`JobGraph` on the `Dispatcher`?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107308
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
+   return FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to deserialize JobGraph.", 
HttpResponseStatus.BAD_REQUEST));
+   }
+
+   gateway.submitJob(jobGraph, Time.seconds(5));
--- End diff --

use `timeout` variable


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler {
+
+   public JobSubmitHandler(CompletableFuture localRestAddress, 
GatewayRetriever leaderRetriever, Time timeout) {
+   super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   JobGraph jobGraph;
+   try {
+   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+   jobGraph = (JobGraph) objectIn.readObject();
+   } catch (Exception e) {
+   log.error("Failed to deserialize JobGraph.", e);
+   return FutureUtils.completedExceptionally(new 
RestHandlerException("Failed to deserialize JobGraph.", 
HttpResponseStatus.BAD_REQUEST));
+   }
+
+   gateway.submitJob(jobGraph, Time.seconds(5));
+
+   return CompletableFuture.completedFuture(new 
JobSubmitResponseBody("Submitted job with ID " + jobGraph.getJobID() + "."));
--- End diff --

We should return the `gateway.submitJob` future because it might contain a 
failure.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121354
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
+
+   public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+   this(serializeJobGraph(jobGraph));
+   }
+
+   @JsonCreator
+   public JobSubmitRequestBody(
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
+
+   this.serializedJobGraph = serializedJobGraph;
+   }
+
+   @Override
+   public int hashCode() {
+   return 71 * Arrays.hashCode(this.serializedJobGraph);
+   }
+
+   @Override
+   public boolean equals(Object object) {
+   if (object instanceof JobSubmitRequestBody) {
+   JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
+   return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
+   }
+   return false;
+   }
+
+   private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+   ObjectOutputStream out = new ObjectOutputStream(baos);
+
+   out.writeObject(jobGraph);
+
+   return baos.toByteArray();
--- End diff --

we could close the streams. Maybe execute in try with resources block. Sure 
it is more of an idiomatic thing than a true problem here.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141872116
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142121606
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest {
--- End diff --

`TestLogger`


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142103273
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141871482
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142107556
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

Why making them public?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141869726
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
--- End diff --

Who shuts down the thread pool?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141869062
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
--- End diff --

check not null


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141872139
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141872335
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.util.ConfigurationException;
+
+/**
+ * A configuration object for {@link RestClusterClient}s.
+ */
+public final class RestClusterClientConfiguration {
+
+   private final String blobServerAddress;
+
+   private final RestClientConfiguration restEndpointConfiguration;
+
+   private final String restServerAddress;
+
+   private final int restServerPort;
+
+   private RestClusterClientConfiguration(
+   String blobServerAddress,
+   RestClientConfiguration endpointConfiguration,
+   String restServerAddress,
+   int restServerPort) {
+   this.blobServerAddress = blobServerAddress;
+   this.restEndpointConfiguration = endpointConfiguration;
+   this.restServerAddress = restServerAddress;
--- End diff --

check not null for the above fields


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r141869527
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
--- End diff --

maybe we could rename it into `restClusterClientConfiguration`. That way 
people might not be too confused by the other `config` field.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142106096
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-09-28 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4742

[FLINK-7072] [REST] Add Flip-6 client for submit/job/cancel

This PR builds on #4730 .

## What is the purpose of the change

This PR adds a new ClusterClient specifically for Flip-6 using the new REST 
architecture. It supports submitting, canceling and stopping jobs.

The job submission is not done purely via REST. The jar upload is still 
implemented through the blobserver. Thus, for submitting a job, we first query 
the blobserver port, upload the jars, and then submit the jobgraph again via 
REST.

The stopping and canceling of jobs uses the existing JobTerminationHandler 
that was recently added.

## Brief change log

* define REST protocol for job submissions
* modify dispatcher to expose the blob server port
* add handlers to dispatcher endpoint for querying the blob server port and 
submitting jobs
* add a new ClusterClient for Flip6 and integrate it into the CLI

## Verifying this change

This change added tests and can be verified as follows:

- start a flip6 cluster using `./bin/start-cluster.sh flip6`
- submit a job using `./bin/flink run -flip6 ` (this will print the 
job ID)
- stop/cancel the job using `./bin/flink [cancel|stop] -flip6 `
- check logs that the job was successfully canceled

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7072

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4742.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4742


commit dab4a51434f4386bfa9c7c5bfc45a8875ede9302
Author: zentol 
Date:   2017-09-20T12:52:56Z

[hotfix] [REST] Extend empty request/parameters support

commit c5b1b734fe117b3a2c8645c7be17de41df4daeb0
Author: zentol 
Date:   2017-09-20T12:57:56Z

[hotfix] [REST] Fix error message if empty request does not conform to 
RequestBody spec

commit 6a38409f87e0429dd2dccfeca93d79d9c9b9a858
Author: zentol 
Date:   2017-09-20T13:00:01Z

[hotfix] [REST] Add special handling for plain-text responses

commit d64cddd66308e125ed338214aa7c1e719f3548d4
Author: zentol 
Date:   2017-09-20T12:58:42Z

[hotfix] [REST] Add utility HandlerRequest constructor

commit c9427017f12ee67a7395fd669f968dfd3af2198c
Author: zentol 
Date:   2017-09-20T12:55:46Z

[FLINK-7072] [REST] Define protocol for job submit/cancel/stop

commit 02dbf5b644abb81cbb25531ad1cb012ef21a8420
Author: zentol 
Date:   2017-09-28T08:49:39Z

[FLINK-7072] [REST] Extend Dispatcher

commit 928171fbe56f42b297ce0ba1bb0566288a1d8383
Author: zentol 
Date:   2017-09-20T12:59:42Z

[FLINK-7072] [REST] Add handlers for job submit/cancel/stop

commit 44bdf5f778ebabcc9ca49fd833e184d21dc10634
Author: zentol 
Date:   2017-09-28T09:17:54Z

[FLINK-7072] [REST] CLI integration




---