[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2018-07-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556256#comment-16556256
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

EronWright commented on issue #4767: [FLINK-7738] [flip-6] Create WebSocket 
handler (server, client)
URL: https://github.com/apache/flink/pull/4767#issuecomment-407890658
 
 
   @tillrohrmann closing this due to inactivity.  Ping me if you want me to 
take another crack at it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>  Labels: pull-request-available
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2018-07-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556257#comment-16556257
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

EronWright closed pull request #4767: [FLINK-7738] [flip-6] Create WebSocket 
handler (server, client)
URL: https://github.com/apache/flink/pull/4767
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
index 4808781c7b8..cf465294f5a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -29,17 +29,24 @@
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
+import java.util.HashMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -137,6 +144,35 @@ public void testRedirectHandler() throws Exception {
}
}
 
+   /**
+* Tests the approach of using the redirect handler as a standalone 
handler.
+*/
+   @Test
+   public void testUserEvent() {
+   final String correctAddress = "foobar:21345";
+   final CompletableFuture localAddressFuture = 
CompletableFuture.completedFuture(correctAddress);
+   final Time timeout = Time.seconds(10L);
+
+   final RestfulGateway localGateway = mock(RestfulGateway.class);
+   
when(localGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(correctAddress));
+   final GatewayRetriever gatewayRetriever = 
mock(GatewayRetriever.class);
+   
when(gatewayRetriever.getNow()).thenReturn(Optional.of(localGateway));
+
+   final RedirectHandler redirectHandler = new 
RedirectHandler<>(
+   localAddressFuture,
+   gatewayRetriever,
+   timeout);
+   final UserEventHandler eventHandler = new UserEventHandler();
+   EmbeddedChannel channel = new EmbeddedChannel(redirectHandler, 
eventHandler);
+
+   // write a (routed) HTTP request, then validate that a user 
event was propagated
+   DefaultFullHttpRequest request = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
+   Routed routed = new Routed(null, false, request, "/", new 
HashMap<>(), new HashMap<>());
+   channel.writeInbound(routed);
+   Assert.assertNotNull(eventHandler.gateway);
+   Assert.assertNotNull(eventHandler.routed);
+   }
+
private static class TestingHandler extends 
RedirectHandler {
 
protected TestingHandler(
@@ -154,4 +190,25 @@ protected void respondAsLeader(ChannelHandlerContext 
channelHandlerContext, Rout
}
}
 
+   private static class UserEventHandler extends 
ChannelInboundHandlerAdapter {
+
+   public volatile T gateway;
+
+   public volatile Routed routed;
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void userEventTriggered(ChannelHandlerContext ctx, 
Object evt) throws Exception {
+   if (evt instanceof RedirectHandler.GatewayRetrieved) {
+   gateway = 
((RedirectHandler.GatewayRetrieved) evt).getGateway();
+   }
+   

[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406457#comment-16406457
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
@tillrohrmann do you need a websocket yet?


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326322#comment-16326322
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4767
  
Hi @EronWright, yes I think we still need support for web sockets. The 
first REST based client won't use this but later on we should definitely add 
this functionality. At the moment we try hard to make Flip-6 feature equivalent 
to the old distributed architecture and therefore we couldn't make progress 
here. But once this has been done, we should re-iterate over this PR again.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324419#comment-16324419
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
@tillrohrmann are you still interested in this websocket code for the REST 
server?   Aside from rebasing, any 'must fix' issues here?


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202661#comment-16202661
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
I don't much like the use of `RequestBody` and `ResponseBody` here, or even 
that the WebSocket distinguishes between client and server messages.  Honestly 
a `MessageBody` marker interface may suffice.   WDTY?


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202468#comment-16202468
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r144381682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractWebSocketHandler.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.WebSocketSpecification;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A channel handler for WebSocket resources.
+ *
+ * This handler handles handshaking and ongoing messaging with a 
WebSocket client,
+ * based on a {@link WebSocketSpecification} that describes the REST 
resource location,
+ * parameter type, and message inbound/outbound types.  Messages are 
automatically
+ * encoded from (and decoded to) JSON text.
+ *
+ * Subclasses should override the following methods to extend the 
respective phases.
+ * 
+ * {@code handshakeInitiated} - occurs upon receipt of a handshake 
request from an HTTP client.  Useful for parameter validation.
+ * {@code handshakeCompleted} - occurs upon successful completion; 
WebSocket is ready for I/O.
+ * {@code messageReceived}: occurs when a WebSocket message is 
received on the channel.
+ * 
+ *
+ * The handler supports gateway availability announcements.
+ *
+ * @param  The gateway type.
+ * @param  The REST parameter type.
+ * @param  The outbound message type.
+ * @param  The inbound message type.
+ */
+public abstract class AbstractWebSocketHandler 
extends ChannelInboundHandlerAdapter {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private final RedirectHandler redirectHandler;
+
+   private final AttributeKey gatewayAttr;
+
+   private final WebSocketSpecification specification;
+
+   private final ChannelHandler messageCodec;
+
+   private final AttributeKey parametersAttr;
+
+   /**
+* Creates a new handler.
+*/
+   public AbstractWebSocketHandler(
+   @Nonnull CompletableFuture localAddressFuture,
+   @Nonnull GatewayRetriever leaderRetriever,

[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200785#comment-16200785
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
Updated the description based on the latest PR.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191474#comment-16191474
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142712872
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.websocket;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+
+/**
+ * A WebSocket for sending and receiving messages.
+ */
+public interface WebSocket {
+
+   /**
+* Adds a listener for websocket messages.
+*/
+   void addListener(WebSocketListener listener);
+
+   /**
+* Sends a message.
+*/
+   ChannelFuture send(ResponseBody message);
--- End diff --

Good catch


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191468#comment-16191468
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142712091
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() 
{
}
}
 
+   private static class TestWebSocketOperation {
+
+   private static class WsParameters extends MessageParameters {
+   private final JobIDPathParameter jobIDPathParameter = 
new JobIDPathParameter();
+
+   @Override
+   public Collection 
getPathParameters() {
+   return 
Collections.singleton(jobIDPathParameter);
+   }
+
+   @Override
+   public Collection 
getQueryParameters() {
+   return Collections.emptyList();
+   }
+   }
+
+   static class WsHeaders implements 
MessageHeaders {
+
+   @Override
+   public HttpMethodWrapper getHttpMethod() {
+   return HttpMethodWrapper.GET;
+   }
+
+   @Override
+   public String getTargetRestEndpointURL() {
+   return "/test/:jobid/subscribe";
+   }
+
+   @Override
+   public Class getRequestClass() {
+   return EmptyRequestBody.class;
+   }
+
+   @Override
+   public Class 
getResponseClass() {
+   return WebSocketUpgradeResponseBody.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.OK;
+   }
+
+   @Override
+   public WsParameters getUnresolvedMessageParameters() {
+   return new WsParameters();
+   }
+   }
+
+   static class WsRestHandler extends 
AbstractRestHandler {
+
+   private final TestEventProvider eventProvider;
+
+   WsRestHandler(
+   CompletableFuture localAddressFuture,
+   GatewayRetriever 
leaderRetriever,
+   TestEventProvider eventProvider,
+   Time timeout) {
+   super(localAddressFuture, leaderRetriever, 
timeout, new WsHeaders());
+   this.eventProvider = eventProvider;
+   }
+
+   @Override
+   protected 
CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull RestfulGateway 
gateway) throws RestHandlerException {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   Assert.assertEquals(PATH_JOB_ID, jobID);
+   ChannelHandler messageHandler = new 
WsMessageHandler(eventProvider, jobID);
--- End diff --

The main value of `AbstractRestHandler` in this scenario is in decoding the 
HTTP request into a `HandlerRequest`.   By factoring that code into a 
`MessageToMessageDecoder` we could reuse it and avoid the need for 
`AbstractRestHandler` in this scenario.

In other words, the 'Netty way' would be to use a pipeline of handlers, 
which is more flexible than an inheritance hierarchy in my opinion.

Normal operation: `[HttpCodec] -> [RestRequestDecoder] -> [RestHandler]`
WebSocket operation: `[HttpCodec] -> [RestRequestDecoder] -> 
[WebSocketHandler]`

We could go further by encapsulating each operation in a handler that 
simply adds the appropriate child handlers, similar to how `HttpCodec` simply 
adds an encoder and decoder to the pipeline.   WDYT?




> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster 

[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191413#comment-16191413
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142700416
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}
}
+
+   public , U extends MessageParameters, R extends 
ResponseBody> CompletableFuture sendWebSocketRequest(String 
targetAddress, int targetPort, M messageHeaders, U messageParameters, Class 
messageClazz, WebSocketListener... listeners) throws IOException {
--- End diff --

I too was unhappy about using a special response body, but felt that the 
alternative required some rework of the REST handler that was best done in a 
follow-up.  With some rework we can eliminate the funky response body.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191393#comment-16191393
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142696121
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -157,7 +160,12 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, Routed routed, T

HandlerUtils.sendErrorResponse(ctx, httpRequest, new 
ErrorResponseBody("Internal server error."), 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
} else {
-   HandlerUtils.sendResponse(ctx, 
httpRequest, resp, messageHeaders.getResponseStatusCode());
+   if (resp instanceof 
WebSocketUpgradeResponseBody) {
+   upgradeToWebSocket(ctx, routed, 
(WebSocketUpgradeResponseBody) resp);
--- End diff --

The REST handler is not active after the upgrade is complete, and it would 
be harmless to remove from the pipeline.  The message handler takes over, 
reading and writing websocket frames based on typed messages.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191130#comment-16191130
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142641670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.websocket;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+
+/**
+ * A WebSocket for sending and receiving messages.
+ */
+public interface WebSocket {
+
+   /**
+* Adds a listener for websocket messages.
+*/
+   void addListener(WebSocketListener listener);
+
+   /**
+* Sends a message.
+*/
+   ChannelFuture send(ResponseBody message);
--- End diff --

My understanding is that the WebSocket interface is only used on the 
client, so shouldn't this be typed to `RequestBody`?


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191131#comment-16191131
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142640401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}
}
+
+   public , U extends MessageParameters, R extends 
ResponseBody> CompletableFuture sendWebSocketRequest(String 
targetAddress, int targetPort, M messageHeaders, U messageParameters, Class 
messageClazz, WebSocketListener... listeners) throws IOException {
+   Preconditions.checkNotNull(targetAddress);
+   Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+   Preconditions.checkNotNull(messageHeaders);
+   Preconditions.checkNotNull(messageParameters);
+   Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
+
+   String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+   URI webSocketURL = URI.create("ws://" + targetAddress + ":" + 
targetPort).resolve(targetUrl);
+   LOG.debug("Sending WebSocket request to {}", webSocketURL);
+
+   final HttpHeaders headers = new DefaultHttpHeaders()
+   .add(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE);
+
+   Bootstrap bootstrap1 = bootstrap.clone().handler(new 
ClientBootstrap() {
+   @Override
+   protected void initChannel(SocketChannel channel) 
throws Exception {
+   super.initChannel(channel);
+   channel.pipeline()
+   .addLast(new 
WebSocketClientProtocolHandler(webSocketURL, WebSocketVersion.V13, null, false, 
headers, 65535))
+   .addLast(new WsResponseHandler(channel, 
messageClazz, listeners));
+   }
+   });
+
+   return CompletableFuture.supplyAsync(() -> 
bootstrap1.connect(targetAddress, targetPort), executor)
+   .thenApply((channel) -> {
+   try {
+   return channel.sync();
+   } catch (InterruptedException e) {
+   throw new FlinkRuntimeException(e);
+   }
+   })
+   .thenApply((ChannelFuture::channel))
+   .thenCompose(channel -> {
+   WsResponseHandler handler = 
channel.pipeline().get(WsResponseHandler.class);
+   return handler.getWebSocketFuture();
+   });
+   }
+
+   private static class WsResponseHandler extends 
SimpleChannelInboundHandler implements WebSocket {
+
+   private final Channel channel;
+   private final Class messageClazz;
+   private final List listeners = new 
CopyOnWriteArrayList<>();
+
+   private final CompletableFuture webSocketFuture = 
new CompletableFuture<>();
+
+   CompletableFuture getWebSocketFuture() {
+   return webSocketFuture;
+   }
+
+   public WsResponseHandler(Channel channel, Class messageClazz, WebSocketListener[] listeners) {
+   this.channel = channel;
+   this.messageClazz = messageClazz;
+   this.listeners.addAll(Arrays.asList(listeners));
+   }
+
+   @Override
+   public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws Exception {
+   LOG.warn("WebSocket exception", cause);
+   webSocketFuture.completeExceptionally(cause);
+   }
+
+   @Override
+   public void userEventTriggered(ChannelHandlerContext ctx, 
Object evt) throws Exception {
+   if (evt instanceof 
WebSocketClientProtocolHandler.ClientHandshakeStateEvent) {
+   
WebSocketClientProtocolHandler.ClientHandshakeStateEvent wsevt = 
(WebSocketClientProtocolHandler.ClientHandshakeStateEvent) evt;
+   switch(wsevt) {
--- End diff --

missing space after switch


> Create WebSocket handler (server)
> -
>
>  

[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191129#comment-16191129
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142637674
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocketListener.java
 ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.websocket;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.util.event.EventListener;
+
+/**
+ * A listener for WebSocket messages.
+ */
+public interface WebSocketListener extends EventListener { }
--- End diff --

I would add a proper type parameter. Currently every implementation would 
be forced to do instanceof+cast checks.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191132#comment-16191132
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142639095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.websocket;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+
+/**
+ * A WebSocket for sending and receiving messages.
+ */
+public interface WebSocket {
--- End diff --

similar to the WebSocketListener this should have a type parameters.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191100#comment-16191100
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142634775
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() 
{
}
}
 
+   private static class TestWebSocketOperation {
+
+   private static class WsParameters extends MessageParameters {
+   private final JobIDPathParameter jobIDPathParameter = 
new JobIDPathParameter();
+
+   @Override
+   public Collection 
getPathParameters() {
+   return 
Collections.singleton(jobIDPathParameter);
+   }
+
+   @Override
+   public Collection 
getQueryParameters() {
+   return Collections.emptyList();
+   }
+   }
+
+   static class WsHeaders implements 
MessageHeaders {
+
+   @Override
+   public HttpMethodWrapper getHttpMethod() {
+   return HttpMethodWrapper.GET;
+   }
+
+   @Override
+   public String getTargetRestEndpointURL() {
+   return "/test/:jobid/subscribe";
+   }
+
+   @Override
+   public Class getRequestClass() {
+   return EmptyRequestBody.class;
+   }
+
+   @Override
+   public Class 
getResponseClass() {
+   return WebSocketUpgradeResponseBody.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.OK;
+   }
+
+   @Override
+   public WsParameters getUnresolvedMessageParameters() {
+   return new WsParameters();
+   }
+   }
+
+   static class WsRestHandler extends 
AbstractRestHandler {
+
+   private final TestEventProvider eventProvider;
+
+   WsRestHandler(
+   CompletableFuture localAddressFuture,
+   GatewayRetriever 
leaderRetriever,
+   TestEventProvider eventProvider,
+   Time timeout) {
+   super(localAddressFuture, leaderRetriever, 
timeout, new WsHeaders());
+   this.eventProvider = eventProvider;
+   }
+
+   @Override
+   protected 
CompletableFuture handleRequest(@Nonnull 
HandlerRequest request, @Nonnull RestfulGateway 
gateway) throws RestHandlerException {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   Assert.assertEquals(PATH_JOB_ID, jobID);
+   ChannelHandler messageHandler = new 
WsMessageHandler(eventProvider, jobID);
--- End diff --

if this is how a AbstractRestHandler implementation for WebSockets would 
actually look like I'm questioning the benefit of implementing it as a 
AbstractRestHandler in the first place.

An explicit AbstractWebSocketRestHandler class could have an abstract 
`initializeWebSocket(HandlerRequest ...)` method instead of hacking it into 
`AbstractRestHandler#handleRequest` and a separate method for creating the 
event provider that is types to the actual response we're sending back.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191099#comment-16191099
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142633088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -157,7 +160,12 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, Routed routed, T

HandlerUtils.sendErrorResponse(ctx, httpRequest, new 
ErrorResponseBody("Internal server error."), 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
} else {
-   HandlerUtils.sendResponse(ctx, 
httpRequest, resp, messageHeaders.getResponseStatusCode());
+   if (resp instanceof 
WebSocketUpgradeResponseBody) {
+   upgradeToWebSocket(ctx, routed, 
(WebSocketUpgradeResponseBody) resp);
--- End diff --

help me out here. After the upgrade is complete, which parts of the 
AbstractRestHandler class are still used?


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191101#comment-16191101
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r142634073
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}
}
+
+   public , U extends MessageParameters, R extends 
ResponseBody> CompletableFuture sendWebSocketRequest(String 
targetAddress, int targetPort, M messageHeaders, U messageParameters, Class 
messageClazz, WebSocketListener... listeners) throws IOException {
--- End diff --

I really dislike how the `WebSocketUpgradeResponseBody` is defined as the 
response in the headers. Not only is this not the actual response we're getting 
back (that would be R), we now also introduce an arbitrary response type, which 
voids the type safety and prevents us from auto-generating proper documentation.

The MessageHeaders are very much a high-level user-facing specification, 
but here we're using it for the setup of the websockets which is a pretty 
relatively low-level affair.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16190784#comment-16190784
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/4767

[FLINK-7738] [flip-6] Create WebSocket handler (server, client)


## What is the purpose of the change

Introduces WebSocket support for the FLIP-6 REST server and client.

The basic idea is to use the normal REST handler to initiate a websocket 
upgrade.In this way, the normal request parsing logic may be used.  For 
example, a REST method of `/jobs/:jobid/subscribe` may be developed using a 
normal REST handler.  The handler responds such that the server initiates the 
upgrade procedure rather than producing a normal REST response.   A new type of 
handler based on `AbstractWebSocketMessageHandler` is then installed into the 
pipeline for subsequent interaction.

Netty's `ChannelGroup` is leveraged to act as an event bus to easily 
dispatch a message to one or more channels based on a routing key.  In the 
above example, the routing key might be `jobid`, meaning that a given channel 
is listening to events related to a certain job.   It is expected that a 
concrete subclass of `RestServerEndpoint` create one or more 
`KeyedChannelRouter` instances as needed for its handlers, and then write 
messages as it sees fit.

The client was similarly adapted to open a `WebSocket` with associated 
listeners.  Consider the work to be a stop-gap pending further discussion.

The `RestEndpointITCase` test was enhanced with an end-to-end 
demonstration.   A separate unit test for `AbstractRestHandler` was also 
introduced.

## Brief change log
- Introduce `AbstractWebSocketMessageHandler` to handle inbound and 
outbound websocket messages.
- Introduce `WebSocketUpgradeResponseBody` as a special REST response that 
triggers a websocket upgrade.
- Update `AbstractRestHandler` to handle websocket upgrades.
- Introduce `KeyedChannelRouter` to route websocket messages to interested 
channels.
- Update `RestClient` with a new method, `sendWebSocketRequest`.
- Introduce `WebSocket` and `WebSocketListener`.
- Update `RestEndpointITCase` with end-to-end websocket test.

## Verifying this change

This change added tests and can be verified as follows:
- `AbstractRestHandlerTest`
- `RestEndpointITCase`
- `AbstractWebSocketMessageHandlerTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive):no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-7738-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4767.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4767


commit f56168846731ab4205a2b04a42285e0b3a3f1972
Author: Wright, Eron 
Date:   2017-10-04T00:26:56Z

[FLINK-7738] [flip-6] Create WebSocket handler (server, client)




> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)