[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407965#comment-16407965
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5573


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407636#comment-16407636
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Changes look good @yanghua. Merging this PR.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406509#comment-16406509
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@tillrohrmann thanks for your reply, it's a misoperation, I have done 
rollback and refactored the code based on your suggestion. Please check the 
change again.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406499#comment-16406499
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Not sure what exactly happened but you can always go back to an earlier
point via the reflog and then a git hard reset.

On Tue, Mar 20, 2018 at 4:11 PM, vinoyang  wrote:

> hi @tillrohrmann  it seems I make a
> wrong git operation, I merged some new commits and squashed into one, then
> pushed into the PR(branch). What should I do to fix this problem?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406473#comment-16406473
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
hi @tillrohrmann it seems I make a wrong git operation, I merged some new 
commits and squashed into one, then pushed into the PR(branch). What should I 
do to fix this problem?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406263#comment-16406263
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175750002
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
 ---
@@ -34,6 +36,10 @@
 
private static final long serialVersionUID = 1L;
 
+   public SerializedValueDeserializer() {
--- End diff --

Alright, this makes sense.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406246#comment-16406246
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175745203
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
 ---
@@ -34,6 +36,10 @@
 
private static final long serialVersionUID = 1L;
 
+   public SerializedValueDeserializer() {
--- End diff --

this is because the `SerializedValueDeserializer ` would be used in a 
jackson annotation `@JsonDeserialize`


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406236#comment-16406236
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175741738
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
}
}
 
+   @Test
+   public void testGetAccumulators() throws Exception {
+   TestAccumulatorHandlers accumulatorHandlers = new 
TestAccumulatorHandlers();
+   TestAccumulatorHandlers.TestAccumulatorHandler 
accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
+
+   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(accumulatorHandler)){
+
+   JobID id = new JobID();
+
+   {
+   Map accumulators = 
restClusterClient.getAccumulators(id);
+   assertNotNull(accumulators);
+   assertEquals(1, accumulators.size());
+
+   assertEquals(true, 
accumulators.containsKey("testKey"));
+   assertEquals("testValue", 
accumulators.get("testKey").toString());
+   }
+   }
+   }
+
+   private class TestAccumulatorHandlers  {
--- End diff --

this follows the `TestSavepointHandlers` in `RestClusterClientTest`, I will 
remove it.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406209#comment-16406209
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175735166
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
 ---
@@ -62,18 +64,33 @@ public JobAccumulatorsHandler(
}
 
@Override
-   protected JobAccumulatorsInfo 
handleRequest(HandlerRequest request, 
AccessExecutionGraph graph) throws RestHandlerException {
-   StringifiedAccumulatorResult[] accs = 
graph.getAccumulatorResultsStringified();
-   List 
userTaskAccumulators = new ArrayList<>(accs.length);
+   protected JobAccumulatorsInfo 
handleRequest(HandlerRequest request, AccessExecutionGraph graph) throws 
RestHandlerException {
+   JobAccumulatorsInfo accumulatorsInfo;
+   List queryParams = 
request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
 
-   for (StringifiedAccumulatorResult acc : accs) {
+   boolean includeSerializedValue = false;
--- End diff --

let's make it `final` and assign `false` in the else branch.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406204#comment-16406204
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175733976
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(JobID jobID) throws 
Exception {
+   return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
+   }
+
+   @Override
+   public Map getAccumulators(final JobID jobID, 
ClassLoader loader) throws Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   try {
+   return 
AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(),
 loader);
+   } catch (Exception e) {
+   log.error("Deserialize accumulators 
with customized classloader error : {}", e);
+   }
+   }
+
+   return Collections.EMPTY_MAP;
--- End diff --

please replace with `Collections.emptyMap()`


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406214#comment-16406214
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175735579
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * request parameter for job accumulator's handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
--- End diff --

Capital letter


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406213#comment-16406213
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175735982
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
 ---
@@ -34,6 +36,10 @@
 
private static final long serialVersionUID = 1L;
 
+   public SerializedValueDeserializer() {
--- End diff --

Why do we add this default constructor? I prefer to specify the type 
explicitly. Also if it is a simple `TypeReference`.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406210#comment-16406210
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175734974
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
}
}
 
+   @Test
+   public void testGetAccumulators() throws Exception {
+   TestAccumulatorHandlers accumulatorHandlers = new 
TestAccumulatorHandlers();
+   TestAccumulatorHandlers.TestAccumulatorHandler 
accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
+
+   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(accumulatorHandler)){
+
+   JobID id = new JobID();
+
+   {
+   Map accumulators = 
restClusterClient.getAccumulators(id);
+   assertNotNull(accumulators);
+   assertEquals(1, accumulators.size());
+
+   assertEquals(true, 
accumulators.containsKey("testKey"));
+   assertEquals("testValue", 
accumulators.get("testKey").toString());
+   }
+   }
+   }
+
+   private class TestAccumulatorHandlers  {
+
+   private class TestAccumulatorHandler extends 
TestHandler {
+
+   public TestAccumulatorHandler() {
+   super(JobAccumulatorsHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, 
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
+   JobAccumulatorsInfo accumulatorsInfo;
+   List queryParams = 
request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
+
+   boolean includeSerializedValue = false;
+   if (!queryParams.isEmpty()) {
+   includeSerializedValue = 
queryParams.get(0);
+   }
+
+   List 
userTaskAccumulators = new ArrayList() 
{{
--- End diff --

Instead of creating a new anonymous class can we simply create an 
`ArrayList` and then add the element (without the initializer block)?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406201#comment-16406201
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175733724
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(JobID jobID) throws 
Exception {
+   return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
+   }
+
+   @Override
+   public Map getAccumulators(final JobID jobID, 
ClassLoader loader) throws Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   try {
+   return 
AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(),
 loader);
+   } catch (Exception e) {
+   log.error("Deserialize accumulators 
with customized classloader error : {}", e);
--- End diff --

Let us properly fail with an exception here.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406205#comment-16406205
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175734525
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
}
}
 
+   @Test
+   public void testGetAccumulators() throws Exception {
+   TestAccumulatorHandlers accumulatorHandlers = new 
TestAccumulatorHandlers();
+   TestAccumulatorHandlers.TestAccumulatorHandler 
accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
+
+   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(accumulatorHandler)){
+
+   JobID id = new JobID();
+
+   {
+   Map accumulators = 
restClusterClient.getAccumulators(id);
+   assertNotNull(accumulators);
+   assertEquals(1, accumulators.size());
+
+   assertEquals(true, 
accumulators.containsKey("testKey"));
+   assertEquals("testValue", 
accumulators.get("testKey").toString());
+   }
+   }
+   }
+
+   private class TestAccumulatorHandlers  {
+
+   private class TestAccumulatorHandler extends 
TestHandler {
+
+   public TestAccumulatorHandler() {
+   super(JobAccumulatorsHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, 
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
--- End diff --

parameter breaking not consistent


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406207#comment-16406207
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175735270
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
--- End diff --

Please start with a capital letter.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406203#comment-16406203
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175733195
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(JobID jobID) throws 
Exception {
+   return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
+   }
--- End diff --

Should not be necessary to override.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406211#comment-16406211
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175736246
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(JobID jobID) throws 
Exception {
+   return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
+   }
+
+   @Override
+   public Map getAccumulators(final JobID jobID, 
ClassLoader loader) throws Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
--- End diff --

`accumulatorsInfo` should always be null. The same applies to 
`getSerializedUserAccumulators`. Thus there is no need for a null check.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406212#comment-16406212
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175736041
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
 ---
@@ -36,6 +38,10 @@
 
private static final long serialVersionUID = 1L;
 
+   public SerializedValueSerializer() {
+   super(TypeFactory.defaultInstance().constructType(new 
TypeReference() {}));
--- End diff --

Same here with the default constructor.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406206#comment-16406206
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175733933
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(JobID jobID) throws 
Exception {
+   return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
+   }
+
+   @Override
+   public Map getAccumulators(final JobID jobID, 
ClassLoader loader) throws Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   try {
+   return 
AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(),
 loader);
+   } catch (Exception e) {
+   log.error("Deserialize accumulators 
with customized classloader error : {}", e);
--- End diff --

Moreover, when logging an exception you don't have to specify a `{}` 
placeholder.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406208#comment-16406208
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175734630
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
}
}
 
+   @Test
+   public void testGetAccumulators() throws Exception {
+   TestAccumulatorHandlers accumulatorHandlers = new 
TestAccumulatorHandlers();
+   TestAccumulatorHandlers.TestAccumulatorHandler 
accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
+
+   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(accumulatorHandler)){
+
+   JobID id = new JobID();
+
+   {
+   Map accumulators = 
restClusterClient.getAccumulators(id);
+   assertNotNull(accumulators);
+   assertEquals(1, accumulators.size());
+
+   assertEquals(true, 
accumulators.containsKey("testKey"));
+   assertEquals("testValue", 
accumulators.get("testKey").toString());
+   }
+   }
+   }
+
+   private class TestAccumulatorHandlers  {
+
+   private class TestAccumulatorHandler extends 
TestHandler {
+
+   public TestAccumulatorHandler() {
+   super(JobAccumulatorsHeaders.getInstance());
+   }
+
+   @Override
+   protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, 
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
+   JobAccumulatorsInfo accumulatorsInfo;
+   List queryParams = 
request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
+
+   boolean includeSerializedValue = false;
--- End diff --

let's make it final and assign false in the else branch


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406202#comment-16406202
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r175734460
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
}
}
 
+   @Test
+   public void testGetAccumulators() throws Exception {
+   TestAccumulatorHandlers accumulatorHandlers = new 
TestAccumulatorHandlers();
+   TestAccumulatorHandlers.TestAccumulatorHandler 
accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
+
+   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(accumulatorHandler)){
+
+   JobID id = new JobID();
+
+   {
+   Map accumulators = 
restClusterClient.getAccumulators(id);
+   assertNotNull(accumulators);
+   assertEquals(1, accumulators.size());
+
+   assertEquals(true, 
accumulators.containsKey("testKey"));
+   assertEquals("testValue", 
accumulators.get("testKey").toString());
+   }
+   }
+   }
+
+   private class TestAccumulatorHandlers  {
--- End diff --

Why having this extra wrapping class?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392561#comment-16392561
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
please do not trigger builds just to get that perfect green build. The test 
failure here is quite common at the moment.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392545#comment-16392545
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
You can use `git commit -m "Trigger build" --allow-empty` to trigger a 
build.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391814#comment-16391814
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua This test failure is unrelated, you can ignore it.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390813#comment-16390813
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@zentol it seems the Travis CI has some problem, always build failed. 
Please review my latest change.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389174#comment-16389174
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * request parameter for job accumulator's handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsMessageParameters extends JobMessageParameters 
{
+
+   public final AccumulatorsIncludeSerializedValueQueryParameter 
queryParameter = new AccumulatorsIncludeSerializedValueQueryParameter();
--- End diff --

field name is a bit generic; how about `includeSerializedAccumulators`?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389172#comment-16389172
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172756524
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList(true));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null && 
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+   return 
accumulatorsInfo.getSerializedUserAccumulators();
--- End diff --

the accumulators should be deserialized via 
`SerializedValue#deserialize(ClassLoader)` .

If `Map getAccumulators(JobID jobID, ClassLoader loader)` 
(that also should be overridden) was called use the passed in `ClassLoader`, 
otherwise `ClassLoader.getSystemClassLoader()`.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389173#comment-16389173
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172755830
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
--- End diff --

we should also override `Map getAccumulators(JobID jobID, 
ClassLoader loader)`


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389161#comment-16389161
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
hi @zentol , I refactor the code based on your review suggestion. Would you 
please review again, thanks!


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388497#comment-16388497
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5573
  
We may want to delay merging this until 
[FLINK-8881](https://issues.apache.org/jira/browse/FLINK-8881) has been 
addressed, as it voids the primary use-case of this handler.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388496#comment-16388496
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172651895
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -389,6 +394,33 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobAccumulatorsMessageParameters accMsgParams = 
accumulatorsHeaders.getUnresolvedMessageParameters();
+   accMsgParams.jobPathParameter.resolve(jobID);
+   
accMsgParams.queryParameter.resolve(Collections.singletonList("true"));
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   accMsgParams
+   );
+
+   return responseFuture.thenApply((JobAccumulatorsInfo 
accumulatorsInfo) -> {
+   if (accumulatorsInfo != null) {
+   Map result = new HashMap<>(3);
+
+   
result.put(JobAccumulatorsInfo.FIELD_NAME_JOB_ACCUMULATORS, 
accumulatorsInfo.getJobAccumulators());
--- End diff --

The resulting API (as shown in `RestClusterClientTest`) is effectively not 
usable for users and inconsistent with existing behavior in `ClusterClient`.

The returned map should only contain the deserialized accumulators with 
their respective name as the key.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388491#comment-16388491
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172650031
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -33,19 +39,38 @@
 public class JobAccumulatorsInfo implements ResponseBody {
public static final String FIELD_NAME_JOB_ACCUMULATORS = 
"job-accumulators";
public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = 
"user-task-accumulators";
+   public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS 
= "serialized-user-task-accumulators";
 
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
private List jobAccumulators;
 
@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
private List userAccumulators;
 
+   @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
+   @JsonSerialize(contentUsing = SerializedValueSerializer.class)
+   private Map serializedUserAccumulators;
+
@JsonCreator
public JobAccumulatorsInfo(
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) 
List jobAccumulators,
-   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators) {
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) 
List userAccumulators,
+   @JsonDeserialize(contentUsing = 
SerializedValueDeserializer.class) 
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map serializedUserAccumulators) {
this.jobAccumulators = 
Preconditions.checkNotNull(jobAccumulators);
this.userAccumulators = 
Preconditions.checkNotNull(userAccumulators);
+   this.serializedUserAccumulators = 
Preconditions.checkNotNull(serializedUserAccumulators);
+   }
+
+   public List getJobAccumulators() {
--- End diff --

missing `@JsonIgnore` annotations


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388476#comment-16388476
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsQueryParameter extends 
MessageQueryParameter {
--- End diff --

The name is also a bit generic.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388474#comment-16388474
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * query parameter for job's accumulator handler {@link 
org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsQueryParameter extends 
MessageQueryParameter {
--- End diff --

Why not directly make this a boolean option?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388478#comment-16388478
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r172648802
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -104,6 +129,18 @@ public UserTaskAccumulator(
this.value = Preconditions.checkNotNull(value);
}
 
+   public String getName() {
--- End diff --

missing `@JsonIgnore` annotation (also applies to other getters


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384579#comment-16384579
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@tillrohrmann , if you have time could you please review this PR? Thanks.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378910#comment-16378910
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua I will take a look this week.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378093#comment-16378093
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
 @GJL I have refactored the code and test case, could you please review the 
new commit? Thanks.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377018#comment-16377018
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua 
I talked to @tillrohrmann offline, and we decided it is enough to add a 
query parameter such as `includeSerializedValue` to the 
`JobAccumulatorsHandler`. If `includeSerializedValue` is `true`, then 
`SerializedValue` should be part of the JSON response (in addition to the 
stringified value), otherwise only the stringified value. By default 
`includeSerializedValue` should be `false` because the Web UI cannot handle 
binary data.  For the request in `RestClusterClient` you would always set the 
flag to `true`.

Let me know if you have any questions.

cc: @tillrohrmann 






> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376974#comment-16376974
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@GJL , the before discussion between I and @aljoscha treat the accumulators 
as a whole (maybe only me?). Because the pre-existing implementation 
`getAccumulatorResultsStringified ` and `getAccumulatorsSerialized ` , so I 
took steps along this path and split the stringified and serialized 
accumulators.

So the conclusion is : accepting your opinion and taking the each key's 
SerializedValue into `UserTaskAccumulator` ? 

If yes, I will try to change the code.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376870#comment-16376870
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
I agree with @aljoscha. We should not add more `public` methods to 
`ClusterClient`. Implementing `getAccumulators()` in `RestClusterClient` is 
enough. 

Regarding the `JobAccumulatorsHandler`, what would speak against adding a 
new field to `UserTaskAccumulator` which stores the `SerializedValue`? The JSON 
representation would always carry an additional base64 encoded `byte` array but 
I think performance isn't important at this point.

cc: @tillrohrmann @aljoscha 


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376792#comment-16376792
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5573
  
I can't comment on the first points but on the last point. I would suggest 
to only add `getAccumulators()` to `RestClusterClient` that behaves the same 
way as the existing `getAccumulators()` on `ClusterClient`: it retrieves the 
serialised values and returns a map of the deserialised values.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376687#comment-16376687
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
OK, base of @aljoscha 's opinion, I describe the implementation detail 
before coding : 

* define a `AccumulatorRepresentationsQueryParameter` class which extends 
`MessageQueryParameter` and contains to two representation mode 
(***stringified***  and ***serialized***)

* define a `JobAccumulatorsMessageParameters` class which extends 
`MessageParameters`, and the `getQueryParameters` method will return the 
`AccumulatorRepresentationsQueryParameter`

* refactor `JobAccumulatorsHeaders#getUnresolvedMessageParameters` return 
`JobAccumulatorsMessageParameters`'s instance

* refactor `JobAccumulatorsHandler#handleRequest` it will query specific 
accumulator's representation base of 
`AccumulatorRepresentationsQueryParameter`, and the `JobAccumulatorsInfo` will 
be reused for both representations

* in `RestClusterClient` class , let `getAccumulators` return stringified 
accumulators(`Map`) and `getSerializedAccumulators` return 
serialized accumulators(`Map`)

@tillrohrmann and @GJL  hope for your opinions, Thanks.



> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376532#comment-16376532
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua I mean having two separate ways of querying: the server should 
have two endpoints (or differentiate based on a query parameter) and return 
either the stringified accumulators or the serialized accumulators. For now, 
the former would be used for the web frontend while the latter would be used by 
the `RestClusterClient`. @GJL maybe has a better opinion on how this should be 
done on the server side, though.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376501#comment-16376501
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@aljoscha yes, I agree. 

> I think it would be good to be able to **query** both representations

The "query" you mean `JobAccumulatorsHandler` or `RestClusterClient` 
behavior?

In `JobAccumulatorsHandler#handleRequest` method, we could query the 
accumulator's string and serialized representations, and boxed in 
`JobAccumulatorsInfo` object. 

1. let `getAccumulators` return accumulator's string representations and 
`getSerializedAccumulators` method return accumulator's serialized 
representations.

2.  let `getAccumulators` return both representations?

Which one is your idea?


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376470#comment-16376470
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5573
  
I think it would be good to be able to query both representations. The 
stringified version can be good if you want to do a quick query using a rest 
client. And we don't have to change the Web UI if we keep both versions.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376391#comment-16376391
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
 @GJL thanks for your guidance! 
 @aljoscha  and @tillrohrmann I have refactored this issue, remained the 
`getAccumulators` method created a `getSerializedAccumulators` method to return 
the serialized accumulators in `ClusterClient`. And override 
`getSerializedAccumulators` in `RestClusterClient`. For `JobAccumulatorsInfo`, 
I also remained `UserTaskAccumulator` and just added a new property for 
serialized accumulators. Please review, thanks.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376075#comment-16376075
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua Take a look at `SerializedValueDeserializer` and 
`SerializedValueSerializer`. You can use `@JsonSerialize(using= ... )`, 
`@JsonSerialize(keyUsing= ... )`, and `@JsonDeserialize` to specify the class 
for ser/des.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376069#comment-16376069
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
Hi @aljoscha and @tillrohrmann  I have tried to put `Map` into `JobAccumulatorsInfo` with two ways : 
*  a `@JsonProperty` 
* marked with `@JsonIgnore` and provide getter and setter

and the two ways both failed with different reason: 

* first way : throw a exception : `SerializedValue` no suitable constructor;
* second way : the setted object will be ignore by 
`HandlerUtils#sendResponse`

In `JobAccumulatorsHandler#handleRequest` I can not return `Map` directly, because this method's result type must 
extends `ResponseBody`.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376027#comment-16376027
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@aljoscha and @tillrohrmann , Thanks for your suggestion. I will rework for 
this. 
Moreover, @tillrohrmann would please review my another PR 
[FLINK-8459](https://github.com/apache/flink/pull/5565)? Thanks!


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376022#comment-16376022
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
I agree with @aljoscha. We could change the `JobAccumulatorsHandler` to 
return the serialized representation of the accumulator map instead of the 
stringified representation. We would then also have to adapt the web ui to work 
with this format.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376021#comment-16376021
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5573#discussion_r170448311
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -363,6 +367,27 @@ public String cancelWithSavepoint(JobID jobId, 
@Nullable String savepointDirecto
});
}
 
+   @Override
+   public Map getAccumulators(final JobID jobID) throws 
Exception {
+   final JobAccumulatorsHeaders accumulatorsHeaders = 
JobAccumulatorsHeaders.getInstance();
+   final JobMessageParameters  params = new JobMessageParameters();
+   params.jobPathParameter.resolve(jobID);
+
+   CompletableFuture responseFuture = 
sendRequest(
+   accumulatorsHeaders,
+   params
+   );
+
+   return responseFuture.thenApply((accumulatorsInfo) -> {
--- End diff --

Adding types to lambdas is always a good idea.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376006#comment-16376006
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua I'll have a look next week. From a first look it seems like we 
might have to extend the rest handler and the client to also return the 
accumulators as a `SerializedValue<>` as the old code for accumulators does, to 
allow user defined accumulator types.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375922#comment-16375922
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
HI @tillrohrmann and @aljoscha , who would review this PR? Thanks!


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375378#comment-16375378
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5573

[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient

## What is the purpose of the change

This PR Support ClusterClient.getAccumulators() in RestClusterClient.

## Brief change log

  - *Send REST request to get `JobAccumulatorsInfo` object*
  - *Use jackson's ObjectMapper convert `JobAccumulatorsInfo` object to Map*
  - *Add a test method into `RestClusterClientTest` class to test the 
`getAccumulators` function*
  - *Add a test handler to mock `JobAccumulatorsInfo` object*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test that validates that actual accumulators size equals we 
mocked in the test handler*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8756

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5573


commit ec8ef5d8ad6d650e250737d5005173994337168c
Author: vinoyang 
Date:   2018-02-24T06:50:55Z

[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient




> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)