[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5194


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r160950656
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JobExecutionResultHandler}.
+ */
+public class JobExecutionResultHandlerTest {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private JobExecutionResultHandler jobExecutionResultHandler;
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
--- End diff --

alright, then let's keep it like it is.


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r160942627
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JobExecutionResultHandler}.
+ */
+public class JobExecutionResultHandlerTest {
--- End diff --

Added now


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-10 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r160684947
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JobExecutionResultHandler}.
+ */
+public class JobExecutionResultHandlerTest {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private JobExecutionResultHandler jobExecutionResultHandler;
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
--- End diff --

The implementation would need to allow to set the results for the methods 
`isJobExecutionResultPresent` and `getJobExecutionResult` (for this test). I 
think it wouldn't differ much from mockito.


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r160126458
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JobExecutionResultHandler}.
+ */
+public class JobExecutionResultHandlerTest {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private JobExecutionResultHandler jobExecutionResultHandler;
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
--- End diff --

Maybe we should implement a `TestingRestfulGateway` which we can use 
instead of Mockito.


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r160116451
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JobExecutionResultHandler}.
+ */
+public class JobExecutionResultHandlerTest {
--- End diff --

extends `TestLogger` missing


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158081176
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+   assertNextToken(p, 
JsonToken.START_OBJECT);
+   accumulatorResults = 
parseAccumulatorResults(p, 

[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158078273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+   assertNextToken(p, 
JsonToken.START_OBJECT);
+   accumulatorResults = 
parseAccumulatorResults(p, 

[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5194

[FLINK-8233][flip6] Add JobExecutionResultHandler

## What is the purpose of the change

*Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. 
This will be needed so that accumulator results can be transmitted to the 
client.*

This PR is based on #5184.

## Brief change log

  - *Add `JobExecutionResultHandler` to enable retrieval of 
`JobExecutionResult`.*
  - *Add serializer and deserializer for `JobExecutionResult`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new and changed classes.*
  - *Manually ran the WordCount example job and fetched the 
`JobExecutionResult` with `curl`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8233-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5194


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.




---