[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5194 ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r160950656 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link JobExecutionResultHandler}. + */ +public class JobExecutionResultHandlerTest { + + private static final JobID TEST_JOB_ID = new JobID(); + + private JobExecutionResultHandler jobExecutionResultHandler; + + @Mock + private RestfulGateway mockRestfulGateway; --- End diff -- alright, then let's keep it like it is. ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r160942627 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link JobExecutionResultHandler}. + */ +public class JobExecutionResultHandlerTest { --- End diff -- Added now ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r160684947 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link JobExecutionResultHandler}. + */ +public class JobExecutionResultHandlerTest { + + private static final JobID TEST_JOB_ID = new JobID(); + + private JobExecutionResultHandler jobExecutionResultHandler; + + @Mock + private RestfulGateway mockRestfulGateway; --- End diff -- The implementation would need to allow to set the results for the methods `isJobExecutionResultPresent` and `getJobExecutionResult` (for this test). I think it wouldn't differ much from mockito. ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r160126458 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link JobExecutionResultHandler}. + */ +public class JobExecutionResultHandlerTest { + + private static final JobID TEST_JOB_ID = new JobID(); + + private JobExecutionResultHandler jobExecutionResultHandler; + + @Mock + private RestfulGateway mockRestfulGateway; --- End diff -- Maybe we should implement a `TestingRestfulGateway` which we can use instead of Mockito. ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r160116451 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link JobExecutionResultHandler}. + */ +public class JobExecutionResultHandlerTest { --- End diff -- extends `TestLogger` missing ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158081176 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS: + assertNextToken(p, JsonToken.START_OBJECT); + accumulatorResults = parseAccumulatorResults(p,
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158078273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS: + assertNextToken(p, JsonToken.START_OBJECT); + accumulatorResults = parseAccumulatorResults(p,
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5194 [FLINK-8233][flip6] Add JobExecutionResultHandler ## What is the purpose of the change *Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. This will be needed so that accumulator results can be transmitted to the client.* This PR is based on #5184. ## Brief change log - *Add `JobExecutionResultHandler` to enable retrieval of `JobExecutionResult`.* - *Add serializer and deserializer for `JobExecutionResult`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new and changed classes.* - *Manually ran the WordCount example job and fetched the `JobExecutionResult` with `curl`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8233-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5194 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. ---