This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe920f  [FLINK-14807][rest] Introduce REST API for communication 
between clients and operator coordinators
9fe920f is described below

commit 9fe920fdc6f7eeaf2d99901099c842cfd0f1380a
Author: TsReaper <tsreape...@gmail.com>
AuthorDate: Fri May 15 07:46:31 2020 +0800

    [FLINK-14807][rest] Introduce REST API for communication between clients 
and operator coordinators
    
    This closes #12037
---
 .../deployment/ClusterClientJobClientAdapter.java  |  14 ++-
 .../deployment/application/EmbeddedJobClient.java  |  20 +++-
 .../apache/flink/client/program/ClusterClient.java |  13 +++
 .../flink/client/program/MiniClusterClient.java    |  20 ++++
 .../client/program/PerJobMiniClusterFactory.java   |  19 +++-
 .../client/program/rest/RestClusterClient.java     |  37 ++++++++
 .../flink/client/program/TestingClusterClient.java |  11 +++
 .../client/program/rest/RestClusterClientTest.java |  75 +++++++++++++++
 .../src/test/resources/rest_api_v1.snapshot        |  33 +++++++
 .../flink/runtime/dispatcher/Dispatcher.java       |  17 ++++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  15 +++
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  19 ++++
 .../flink/runtime/minicluster/MiniCluster.java     |  14 +++
 .../coordination/CoordinationRequest.java          |  27 ++++++
 .../coordination/CoordinationRequestGateway.java   |  39 ++++++++
 .../coordination/CoordinationRequestHandler.java   |  36 +++++++
 .../coordination/CoordinationResponse.java         |  27 ++++++
 .../coordination/ClientCoordinationHandler.java    |  84 +++++++++++++++++
 .../rest/messages/OperatorIDPathParameter.java     |  49 ++++++++++
 .../coordination/ClientCoordinationHeaders.java    |  80 ++++++++++++++++
 .../ClientCoordinationMessageParameters.java       |  49 ++++++++++
 .../ClientCoordinationRequestBody.java             |  56 +++++++++++
 .../ClientCoordinationResponseBody.java            |  56 +++++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  36 ++++++-
 .../flink/runtime/scheduler/SchedulerNG.java       |  12 +++
 .../flink/runtime/webmonitor/RestfulGateway.java   |  24 +++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   9 ++
 .../jobmaster/utils/TestingJobMasterGateway.java   |  14 ++-
 .../utils/TestingJobMasterGatewayBuilder.java      |  11 ++-
 .../OperatorCoordinatorSchedulerTest.java          |  48 ++++++++++
 .../TestingCoordinationRequestHandler.java         | 104 +++++++++++++++++++++
 .../webmonitor/TestingDispatcherGateway.java       |  14 ++-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  34 ++++++-
 .../environment/RemoteStreamEnvironmentTest.java   |  11 +++
 34 files changed, 1112 insertions(+), 15 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 3fb0e5c..94c31f2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -26,6 +26,10 @@ import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 import org.apache.commons.io.IOUtils;
 
@@ -41,7 +45,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * An implementation of the {@link JobClient} interface that uses a {@link 
ClusterClient} underneath..
  */
-public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
+public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, 
CoordinationRequestGateway {
 
        private final ClusterClientProvider<ClusterID> clusterClientProvider;
 
@@ -115,6 +119,13 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient {
                                        })));
        }
 
+       @Override
+       public CompletableFuture<CoordinationResponse> 
sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+               return bridgeClientRequest(
+                       clusterClientProvider,
+                       clusterClient -> 
clusterClient.sendCoordinationRequest(jobID, operatorId, request));
+       }
+
        private static <T> CompletableFuture<T> bridgeClientRequest(
                        ClusterClientProvider<?> clusterClientProvider,
                        Function<ClusterClient<?>, CompletableFuture<T>> 
resultRetriever) {
@@ -132,5 +143,4 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient {
                return resultFuture.whenCompleteAsync(
                                (jobResult, throwable) -> 
IOUtils.closeQuietly(clusterClient::close));
        }
-
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
index 617ca08..e72c467 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
@@ -25,12 +25,19 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -42,7 +49,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * uses directly the {@link DispatcherGateway}.
  */
 @Internal
-public class EmbeddedJobClient implements JobClient {
+public class EmbeddedJobClient implements JobClient, 
CoordinationRequestGateway {
 
        private final JobID jobId;
 
@@ -119,4 +126,15 @@ public class EmbeddedJobClient implements JobClient {
                                        }
                                });
        }
+
+       @Override
+       public CompletableFuture<CoordinationResponse> 
sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+               try {
+                       SerializedValue<CoordinationRequest> serializedRequest 
= new SerializedValue<>(request);
+                       return 
dispatcherGateway.deliverCoordinationRequestToCoordinator(
+                               jobId, operatorId, serializedRequest, timeout);
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 952e428..e6843dc 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -162,4 +165,14 @@ public interface ClusterClient<T> extends AutoCloseable {
         * @return path future where the savepoint is located
         */
        CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable 
String savepointDirectory);
+
+       /**
+        * Sends out a request to a specified coordinator and return the 
response.
+        *
+        * @param jobId specifies the job which the coordinator belongs to
+        * @param operatorId specifies which coordinator to receive the request
+        * @param request the request to send
+        * @return the response from the coordinator
+        */
+       CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID 
jobId, OperatorID operatorId, CoordinationRequest request);
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 2eeb1f4..4c81681 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -24,12 +24,17 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -155,6 +161,20 @@ public class MiniClusterClient implements 
ClusterClient<MiniClusterClient.MiniCl
                }
        }
 
+       @Override
+       public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       CoordinationRequest request) {
+               try {
+                       SerializedValue<CoordinationRequest> serializedRequest 
= new SerializedValue<>(request);
+                       return 
miniCluster.deliverCoordinationRequestToCoordinator(jobId, operatorId, 
serializedRequest);
+               } catch (IOException e) {
+                       LOG.error("Error while sending coordination request", 
e);
+                       return FutureUtils.completedExceptionally(e);
+               }
+       }
+
        /**
         * The type of the Cluster ID for the local {@link MiniCluster}.
         */
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
index f8998a2..16ca6b6 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
@@ -26,18 +26,25 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -127,7 +134,7 @@ public final class PerJobMiniClusterFactory {
        /**
         * A {@link JobClient} for a {@link PerJobMiniClusterFactory}.
         */
-       private static final class PerJobMiniClusterJobClient implements 
JobClient {
+       private static final class PerJobMiniClusterJobClient implements 
JobClient, CoordinationRequestGateway {
 
                private final JobID jobID;
                private final MiniCluster miniCluster;
@@ -182,5 +189,15 @@ public final class PerJobMiniClusterFactory {
                                }
                        });
                }
+
+               @Override
+               public CompletableFuture<CoordinationResponse> 
sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+                       try {
+                               SerializedValue<CoordinationRequest> 
serializedRequest = new SerializedValue<>(request);
+                               return 
miniCluster.deliverCoordinationRequestToCoordinator(jobID, operatorId, 
serializedRequest);
+                       } catch (IOException e) {
+                               return FutureUtils.completedExceptionally(e);
+                       }
+               }
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index f2c6aa1..0b6601c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -37,9 +37,12 @@ import 
org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import 
org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
@@ -67,6 +70,9 @@ import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -88,6 +94,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -411,6 +418,36 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                return triggerSavepoint(jobId, savepointDirectory, false);
        }
 
+       @Override
+       public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       CoordinationRequest request) {
+               ClientCoordinationHeaders headers = 
ClientCoordinationHeaders.getInstance();
+               ClientCoordinationMessageParameters params = new 
ClientCoordinationMessageParameters();
+               params.jobPathParameter.resolve(jobId);
+               params.operatorPathParameter.resolve(operatorId);
+
+               SerializedValue<CoordinationRequest> serializedRequest;
+               try {
+                       serializedRequest = new SerializedValue<>(request);
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
+
+               ClientCoordinationRequestBody requestBody = new 
ClientCoordinationRequestBody(serializedRequest);
+               return sendRequest(headers, params, requestBody).thenApply(
+                       responseBody -> {
+                               try {
+                                       return responseBody
+                                               
.getSerializedCoordinationResponse()
+                                               
.deserializeValue(getClass().getClassLoader());
+                               } catch (IOException | ClassNotFoundException 
e) {
+                                       throw new CompletionException("Failed 
to deserialize coordination response", e);
+                               }
+                       });
+       }
+
        private CompletableFuture<String> triggerSavepoint(
                        final JobID jobId,
                        final @Nullable String savepointDirectory,
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index 46df5ea..25124a0 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nonnull;
@@ -133,6 +136,14 @@ public class TestingClusterClient<T> implements 
ClusterClient<T> {
        }
 
        @Override
+       public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       CoordinationRequest request) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public void close() {
 
        }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index ff1f5ff..a55b9e7 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -39,10 +39,13 @@ import 
org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.RestClient;
@@ -76,6 +79,10 @@ import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -701,6 +708,74 @@ public class RestClusterClientTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testSendCoordinationRequest() throws Exception {
+               final TestClientCoordinationHandler handler = new 
TestClientCoordinationHandler();
+
+               try (TestRestServerEndpoint restServerEndpoint = 
createRestServerEndpoint(handler)) {
+                       RestClusterClient<?> restClusterClient = 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+                       String payload = "testing payload";
+                       TestCoordinationRequest<String> request  = new 
TestCoordinationRequest<>(payload);
+                       try {
+                               CompletableFuture<CoordinationResponse> future =
+                                       
restClusterClient.sendCoordinationRequest(jobId, new OperatorID(), request);
+                               TestCoordinationResponse response = 
(TestCoordinationResponse) future.get();
+
+                               assertEquals(payload, response.payload);
+                       } finally {
+                               restClusterClient.close();
+                       }
+               }
+       }
+
+       private class TestClientCoordinationHandler extends 
TestHandler<ClientCoordinationRequestBody, ClientCoordinationResponseBody, 
ClientCoordinationMessageParameters> {
+
+               private TestClientCoordinationHandler() {
+                       super(ClientCoordinationHeaders.getInstance());
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               protected CompletableFuture<ClientCoordinationResponseBody> 
handleRequest(@Nonnull HandlerRequest<ClientCoordinationRequestBody, 
ClientCoordinationMessageParameters> request, @Nonnull DispatcherGateway 
gateway) throws RestHandlerException {
+                       try {
+                               TestCoordinationRequest req =
+                                       (TestCoordinationRequest) request
+                                               .getRequestBody()
+                                               
.getSerializedCoordinationRequest()
+                                               
.deserializeValue(getClass().getClassLoader());
+                               TestCoordinationResponse resp = new 
TestCoordinationResponse(req.payload);
+                               return CompletableFuture.completedFuture(
+                                       new ClientCoordinationResponseBody(
+                                               new SerializedValue<>(resp)));
+                       } catch (Exception e) {
+                               return FutureUtils.completedExceptionally(e);
+                       }
+               }
+       }
+
+       private static class TestCoordinationRequest<T> implements 
CoordinationRequest {
+
+               private static final long serialVersionUID = 1L;
+
+               private final T payload;
+
+               private TestCoordinationRequest(T payload) {
+                       this.payload = payload;
+               }
+       }
+
+       private static class TestCoordinationResponse<T> implements 
CoordinationResponse {
+
+               private static final long serialVersionUID = 1L;
+
+               private final T payload;
+
+               private TestCoordinationResponse(T payload) {
+                       this.payload = payload;
+               }
+       }
+
        private class TestAccumulatorHandler extends 
TestHandler<EmptyRequestBody, JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> {
 
                public TestAccumulatorHandler() {
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index af55a7a..b14b741 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1430,6 +1430,39 @@
       "type" : "any"
     }
   }, {
+    "url" : "/jobs/:jobid/coordinators/:operatorid",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      }, {
+        "key" : "operatorid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationRequestBody",
+      "properties" : {
+        "serializedCoordinationRequest" : {
+          "type" : "any"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationResponseBody",
+      "properties" : {
+        "serializedCoordinationResult" : {
+          "type" : "any"
+        }
+      }
+    }
+  }, {
     "url" : "/jobs/:jobid/exceptions",
     "method" : "GET",
     "status-code" : "200 OK",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 05aab6e..dde3908 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl;
@@ -55,6 +56,8 @@ import 
org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -66,6 +69,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.util.function.FunctionUtils;
@@ -625,6 +629,19 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
+       @Override
+       public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       SerializedValue<CoordinationRequest> serializedRequest,
+                       Time timeout) {
+               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+
+               return jobMasterGatewayFuture.thenCompose(
+                       (JobMasterGateway jobMasterGateway) ->
+                               
jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, 
serializedRequest, timeout));
+       }
+
        /**
         * Cleans up the job related data from the dispatcher. If cleanupHA is 
true, then
         * the data will also be removed from HA.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 543ef12..d5087cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -62,6 +62,8 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -718,6 +720,19 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                return 
CompletableFuture.completedFuture(aggregateFunction.getResult(accumulator));
        }
 
+       @Override
+       public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+                       OperatorID operatorId,
+                       SerializedValue<CoordinationRequest> serializedRequest,
+                       Time timeout) {
+               try {
+                       CoordinationRequest request = 
serializedRequest.deserializeValue(userCodeLoader);
+                       return 
schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
+               } catch (Exception e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Internal methods
        
//----------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 1006dad..16decc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -33,8 +33,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -44,6 +47,7 @@ import 
org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
@@ -271,4 +275,19 @@ public interface JobMasterGateway extends
         * @return The updated aggregate
         */
        CompletableFuture<Object> updateGlobalAggregate(String aggregateName, 
Object aggregand, byte[] serializedAggregationFunction);
+
+       /**
+        * Deliver a coordination request to a specified coordinator and return 
the response.
+        *
+        * @param operatorId identifying the coordinator to receive the request
+        * @param serializedRequest serialized request to deliver
+        * @return A future containing the response.
+        *         The response will fail with a {@link 
org.apache.flink.util.FlinkException}
+        *         if the task is not running, or no operator/coordinator 
exists for the given ID,
+        *         or the coordinator cannot handle client events.
+        */
+       CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+               OperatorID operatorId,
+               SerializedValue<CoordinationRequest> serializedRequest,
+               @RpcTimeout Time timeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 556af4a..58e08ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -60,6 +61,8 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.ReporterSetup;
 import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
 import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
@@ -79,6 +82,7 @@ import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
@@ -601,6 +605,16 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                return runDispatcherCommand(dispatcherGateway -> 
dispatcherGateway.requestJob(jobId, rpcTimeout));
        }
 
+       public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       SerializedValue<CoordinationRequest> serializedRequest) 
{
+               return runDispatcherCommand(
+                       dispatcherGateway ->
+                               
dispatcherGateway.deliverCoordinationRequestToCoordinator(
+                                       jobId, operatorId, serializedRequest, 
rpcTimeout));
+       }
+
        private <T> CompletableFuture<T> 
runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> 
dispatcherCommand) {
                return 
getDispatcherGatewayFuture().thenApply(dispatcherCommand).thenCompose(Function.identity());
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
new file mode 100644
index 0000000..a02678d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all requests from the client to a {@link 
OperatorCoordinator}
+ * which requests for a {@link CoordinationResponse}.
+ */
+public interface CoordinationRequest extends Serializable {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
new file mode 100644
index 0000000..0767569
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client interface which sends out a {@link CoordinationRequest} and
+ * expects for a {@link CoordinationResponse} from a {@link 
OperatorCoordinator}.
+ */
+public interface CoordinationRequestGateway {
+
+       /**
+        * Send out a request to a specified coordinator and return the 
response.
+        *
+        * @param operatorId specifies which coordinator to receive the request
+        * @param request the request to send
+        * @return the response from the coordinator
+        */
+       CompletableFuture<CoordinationResponse> 
sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
new file mode 100644
index 0000000..3514d1e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Coordinator interface which can handle {@link CoordinationRequest}s
+ * and response with {@link CoordinationResponse}s to the client.
+ */
+public interface CoordinationRequestHandler {
+
+       /**
+        * Called when receiving a request from the client.
+        *
+        * @param request the request received
+        * @return a future containing the response from the coordinator for 
this request
+        */
+       CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
new file mode 100644
index 0000000..d28e30b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all responses from a {@link OperatorCoordinator} to the 
client
+ * which is the response for a {@link CoordinationRequest}.
+ */
+public interface CoordinationResponse extends Serializable {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
new file mode 100644
index 0000000..e361f64
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.coordination;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Handler that receives the coordination requests from the client and returns 
the response from the coordinator.
+ */
+public class ClientCoordinationHandler extends 
AbstractRestHandler<RestfulGateway, ClientCoordinationRequestBody, 
ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+       public ClientCoordinationHandler(
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       Map<String, String> responseHeaders,
+                       MessageHeaders<ClientCoordinationRequestBody, 
ClientCoordinationResponseBody, ClientCoordinationMessageParameters> 
messageHeaders) {
+               super(leaderRetriever, timeout, responseHeaders, 
messageHeaders);
+       }
+
+       @Override
+       protected CompletableFuture<ClientCoordinationResponseBody> 
handleRequest(
+                       @Nonnull HandlerRequest<ClientCoordinationRequestBody, 
ClientCoordinationMessageParameters> request,
+                       @Nonnull RestfulGateway gateway) throws 
RestHandlerException {
+               JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+               OperatorID operatorId = 
request.getPathParameter(OperatorIDPathParameter.class);
+               SerializedValue<CoordinationRequest> serializedRequest =
+                       
request.getRequestBody().getSerializedCoordinationRequest();
+               CompletableFuture<CoordinationResponse> responseFuture =
+                       gateway.deliverCoordinationRequestToCoordinator(jobId, 
operatorId, serializedRequest, timeout);
+               return responseFuture.thenApply(
+                       coordinationResponse -> {
+                               try {
+                                       return new 
ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));
+                               } catch (IOException e) {
+                                       throw new CompletionException(
+                                               new RestHandlerException(
+                                                       "Failed to serialize 
coordination response",
+                                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                                       e));
+                               }
+                       });
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
new file mode 100644
index 0000000..87d985c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * Path parameter identifying operators.
+ */
+public class OperatorIDPathParameter extends MessagePathParameter<OperatorID> {
+
+       public static final String KEY = "operatorid";
+
+       public OperatorIDPathParameter() {
+               super(KEY);
+       }
+
+       @Override
+       protected OperatorID convertFromString(String value) throws 
ConversionException {
+               return new OperatorID(StringUtils.hexStringToByte(value));
+       }
+
+       @Override
+       protected String convertToString(OperatorID value) {
+               return value.toString();
+       }
+
+       @Override
+       public String getDescription() {
+               return "string value that identifies an operator.";
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
new file mode 100644
index 0000000..aded1e3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link ClientCoordinationHandler}.
+ */
+@Documentation.ExcludeFromDocumentation(
+       "This API is not exposed to the users, as coordinators are used only 
internally.")
+public class ClientCoordinationHeaders implements 
MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, 
ClientCoordinationMessageParameters> {
+
+       public static final String URL = 
"/jobs/:jobid/coordinators/:operatorid";
+
+       private static final ClientCoordinationHeaders INSTANCE = new 
ClientCoordinationHeaders();
+
+       private ClientCoordinationHeaders() {}
+
+       @Override
+       public Class<ClientCoordinationRequestBody> getRequestClass() {
+               return ClientCoordinationRequestBody.class;
+       }
+
+       @Override
+       public Class<ClientCoordinationResponseBody> getResponseClass() {
+               return ClientCoordinationResponseBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public ClientCoordinationMessageParameters 
getUnresolvedMessageParameters() {
+               return new ClientCoordinationMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.POST;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static ClientCoordinationHeaders getInstance() {
+               return INSTANCE;
+       }
+
+       @Override
+       public String getDescription() {
+               return "Send a request to a specified coordinator of the 
specified job and get the response. " +
+                       "This API is for internal use only.";
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
new file mode 100644
index 0000000..01be123
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import 
org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link ClientCoordinationHandler}.
+ */
+public class ClientCoordinationMessageParameters extends MessageParameters {
+
+       public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+       public final OperatorIDPathParameter operatorPathParameter = new 
OperatorIDPathParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Arrays.asList(jobPathParameter, operatorPathParameter);
+       }
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return Collections.emptyList();
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
new file mode 100644
index 0000000..e663320
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * Request that carries a serialized {@link CoordinationRequest} to a 
specified coordinator.
+ */
+public class ClientCoordinationRequestBody implements RequestBody {
+
+       public static final String FIELD_NAME_SERIALIZED_COORDINATION_REQUEST = 
"serializedCoordinationRequest";
+
+       @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST)
+       @JsonSerialize(using = SerializedValueSerializer.class)
+       @JsonDeserialize(using = SerializedValueDeserializer.class)
+       private final SerializedValue<CoordinationRequest> 
serializedCoordinationRequest;
+
+       @JsonCreator
+       public ClientCoordinationRequestBody(
+               @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST)
+                       SerializedValue<CoordinationRequest> 
serializedCoordinationRequest) {
+               this.serializedCoordinationRequest = 
serializedCoordinationRequest;
+       }
+
+       @JsonIgnore
+       public SerializedValue<CoordinationRequest> 
getSerializedCoordinationRequest() {
+               return serializedCoordinationRequest;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
new file mode 100644
index 0000000..5d1cdf2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * Response that carries a serialized {@link CoordinationResponse} to the 
client.
+ */
+public class ClientCoordinationResponseBody implements ResponseBody {
+
+       public static final String FIELD_NAME_SERIALIZED_COORDINATION_RESULT = 
"serializedCoordinationResult";
+
+       @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT)
+       @JsonSerialize(using = SerializedValueSerializer.class)
+       @JsonDeserialize(using = SerializedValueDeserializer.class)
+       private final SerializedValue<CoordinationResponse> 
serializedCoordinationResponse;
+
+       @JsonCreator
+       public ClientCoordinationResponseBody(
+               @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT)
+                       SerializedValue<CoordinationResponse> 
serializedCoordinationResponse) {
+               this.serializedCoordinationResponse = 
serializedCoordinationResponse;
+       }
+
+       @JsonIgnore
+       public SerializedValue<CoordinationResponse> 
getSerializedCoordinationResponse() {
+               return serializedCoordinationResponse;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 2be0e5c..7cc17dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -76,6 +76,9 @@ import 
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
@@ -107,6 +110,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -164,6 +168,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 
        protected final ExecutionVertexVersioner executionVertexVersioner;
 
+       private final Map<OperatorID, OperatorCoordinator> coordinatorMap;
+
        private ComponentMainThreadExecutor mainThreadExecutor = new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
                "SchedulerBase is not initialized with proper main thread 
executor. " +
                        "Call to SchedulerBase.setMainThreadExecutor(...) 
required.");
@@ -222,6 +228,8 @@ public abstract class SchedulerBase implements SchedulerNG {
                this.schedulingTopology = 
executionGraph.getSchedulingTopology();
 
                this.inputsLocationsRetriever = new 
ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+               this.coordinatorMap = createCoordinatorMap();
        }
 
        private ExecutionGraph createAndRestoreExecutionGraph(
@@ -932,6 +940,20 @@ public abstract class SchedulerBase implements SchedulerNG 
{
                }
        }
 
+       @Override
+       public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+                       OperatorID operator,
+                       CoordinationRequest request) throws FlinkException {
+               OperatorCoordinator coordinator = coordinatorMap.get(operator);
+               if (coordinator instanceof CoordinationRequestHandler) {
+                       return ((CoordinationRequestHandler) 
coordinator).handleCoordinationRequest(request);
+               } else if (coordinator != null) {
+                       throw new FlinkException("Coordinator of operator " + 
operator + " cannot handle client event");
+               } else {
+                       throw new FlinkException("Coordinator of operator " + 
operator + " does not exist");
+               }
+       }
+
        private void startAllOperatorCoordinators() {
                final Collection<OperatorCoordinator> coordinators = 
getAllCoordinators();
                try {
@@ -951,8 +973,16 @@ public abstract class SchedulerBase implements SchedulerNG 
{
        }
 
        private Collection<OperatorCoordinator> getAllCoordinators() {
-               return getExecutionGraph().getAllVertices().values().stream()
-                       .flatMap((vertex) -> 
vertex.getOperatorCoordinators().stream())
-                       .collect(Collectors.toList());
+               return coordinatorMap.values();
+       }
+
+       private Map<OperatorID, OperatorCoordinator> createCoordinatorMap() {
+               Map<OperatorID, OperatorCoordinator> coordinatorMap = new 
HashMap<>();
+               for (ExecutionJobVertex vertex : 
getExecutionGraph().getAllVertices().values()) {
+                       for (Map.Entry<OperatorID, OperatorCoordinator> entry : 
vertex.getOperatorCoordinatorMap().entrySet()) {
+                               coordinatorMap.put(entry.getKey(), 
entry.getValue());
+                       }
+               }
+               return coordinatorMap;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 5a75d37..07f7e7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -40,6 +40,8 @@ import 
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
@@ -138,4 +140,14 @@ public interface SchedulerNG {
         *                        for the given ID.
         */
        void deliverOperatorEventToCoordinator(ExecutionAttemptID 
taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException;
+
+       /**
+        * Delivers a coordination request to the {@link OperatorCoordinator} 
with the given {@link OperatorID}
+        * and returns the coordinator's response.
+        *
+        * @return A future containing the response.
+        * @throws FlinkException Thrown, if the task is not running, or no 
operator/coordinator exists
+        *                        for the given ID, or the coordinator cannot 
handle client events.
+        */
+       CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(OperatorID operator, 
CoordinationRequest request) throws FlinkException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index be0e45a..c736565 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -26,15 +26,19 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -186,4 +190,24 @@ public interface RestfulGateway extends RpcGateway {
        default CompletableFuture<Acknowledge> shutDownCluster() {
                throw new UnsupportedOperationException();
        }
+
+       /**
+        * Deliver a coordination request to a specified coordinator and return 
the response.
+        *
+        * @param jobId identifying the job which the coordinator belongs to
+        * @param operatorId identifying the coordinator to receive the request
+        * @param serializedRequest serialized request to deliver
+        * @param timeout RPC timeout
+        * @return A future containing the response.
+        *         The response will fail with a {@link 
org.apache.flink.util.FlinkException}
+        *         if the task is not running, or no operator/coordinator 
exists for the given ID,
+        *         or the coordinator cannot handle client events.
+        */
+       default CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       SerializedValue<CoordinationRequest> serializedRequest,
+                       @RpcTimeout Time timeout) {
+               throw new UnsupportedOperationException();
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index c17b2b7..fd67f86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -64,6 +64,7 @@ import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler;
@@ -119,6 +120,7 @@ import 
org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -559,6 +561,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                final 
ClusterDataSetDeleteHandlers.ClusterDataSetDeleteStatusHandler 
clusterDataSetDeleteStatusHandler =
                        clusterDataSetDeleteHandlers.new 
ClusterDataSetDeleteStatusHandler(leaderRetriever, timeout, responseHeaders);
 
+               final ClientCoordinationHandler clientCoordinationHandler = new 
ClientCoordinationHandler(
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       ClientCoordinationHeaders.getInstance());
+
                final ShutdownHandler shutdownHandler = new ShutdownHandler(
                        leaderRetriever,
                        timeout,
@@ -625,6 +633,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                
handlers.add(Tuple2.of(clusterDataSetListHandler.getMessageHeaders(), 
clusterDataSetListHandler));
                
handlers.add(Tuple2.of(clusterDataSetDeleteTriggerHandler.getMessageHeaders(), 
clusterDataSetDeleteTriggerHandler));
                
handlers.add(Tuple2.of(clusterDataSetDeleteStatusHandler.getMessageHeaders(), 
clusterDataSetDeleteStatusHandler));
+               
handlers.add(Tuple2.of(clientCoordinationHandler.getMessageHeaders(), 
clientCoordinationHandler));
 
                // TODO: Remove once the Yarn proxy can forward all REST verbs
                
handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), 
yarnJobCancelTerminationHandler));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 47b9206..59b3cc5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -42,6 +42,8 @@ import 
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -157,6 +159,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        @Nonnull
        private final TriFunction<ExecutionAttemptID, OperatorID, 
SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> 
operatorEventSender;
 
+       @Nonnull
+       private final BiFunction<OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestFunction;
+
        public TestingJobMasterGateway(
                        @Nonnull String address,
                        @Nonnull String hostname,
@@ -185,7 +190,8 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                        @Nonnull Function<Tuple6<JobID, JobVertexID, 
KeyGroupRange, String, KvStateID, InetSocketAddress>, 
CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction,
                        @Nonnull Function<Tuple4<JobID, JobVertexID, 
KeyGroupRange, String>, CompletableFuture<Acknowledge>> 
notifyKvStateUnregisteredFunction,
                        @Nonnull TriFunction<String, Object, byte[], 
CompletableFuture<Object>> updateAggregateFunction,
-                       @Nonnull TriFunction<ExecutionAttemptID, OperatorID, 
SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> 
operatorEventSender) {
+                       @Nonnull TriFunction<ExecutionAttemptID, OperatorID, 
SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> 
operatorEventSender,
+                       @Nonnull BiFunction<OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestFunction) {
                this.address = address;
                this.hostname = hostname;
                this.cancelFunction = cancelFunction;
@@ -214,6 +220,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                this.notifyKvStateUnregisteredFunction = 
notifyKvStateUnregisteredFunction;
                this.updateAggregateFunction = updateAggregateFunction;
                this.operatorEventSender = operatorEventSender;
+               this.deliverCoordinationRequestFunction = 
deliverCoordinationRequestFunction;
        }
 
        @Override
@@ -360,4 +367,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        public CompletableFuture<Acknowledge> 
sendOperatorEventToCoordinator(ExecutionAttemptID task, OperatorID operatorID, 
SerializedValue<OperatorEvent> event) {
                return operatorEventSender.apply(task, operatorID, event);
        }
+
+       @Override
+       public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(OperatorID operatorId, 
SerializedValue<CoordinationRequest> serializedRequest, Time timeout) {
+               return deliverCoordinationRequestFunction.apply(operatorId, 
serializedRequest);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index a960448..20202cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -41,6 +41,8 @@ import 
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -102,6 +104,7 @@ public class TestingJobMasterGatewayBuilder {
        private Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, 
CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
        private TriFunction<String, Object, byte[], CompletableFuture<Object>> 
updateAggregateFunction = (a, b, c) -> CompletableFuture.completedFuture(new 
Object());
        private TriFunction<ExecutionAttemptID, OperatorID, 
SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> 
operatorEventSender = (a, b, c) -> 
CompletableFuture.completedFuture(Acknowledge.get());
+       private BiFunction<OperatorID, SerializedValue<CoordinationRequest>, 
CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction = 
(a, b) -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
 
        public TestingJobMasterGatewayBuilder setAddress(String address) {
                this.address = address;
@@ -243,6 +246,11 @@ public class TestingJobMasterGatewayBuilder {
                return this;
        }
 
+       public TestingJobMasterGatewayBuilder 
setDeliverCoordinationRequestFunction(BiFunction<OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestFunction) {
+               this.deliverCoordinationRequestFunction = 
deliverCoordinationRequestFunction;
+               return this;
+       }
+
        public TestingJobMasterGateway build() {
                return new TestingJobMasterGateway(
                        address,
@@ -272,6 +280,7 @@ public class TestingJobMasterGatewayBuilder {
                        notifyKvStateRegisteredFunction,
                        notifyKvStateUnregisteredFunction,
                        updateAggregateFunction,
-                       operatorEventSender);
+                       operatorEventSender,
+                       deliverCoordinationRequestFunction);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 17a82f0..8ecf172 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -35,6 +36,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -163,6 +165,52 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                assertThat(result, futureFailedWith(TestException.class));
        }
 
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testDeliveringClientRequestToRequestHandler() throws 
Exception {
+               final OperatorCoordinator.Provider provider = new 
TestingCoordinationRequestHandler.Provider(testOperatorId);
+               final DefaultScheduler scheduler = createScheduler(provider);
+
+               final String payload = "testing payload";
+               final TestingCoordinationRequestHandler.Request<String> request 
=
+                       new 
TestingCoordinationRequestHandler.Request<>(payload);
+               final TestingCoordinationRequestHandler.Response<String> 
response =
+                       (TestingCoordinationRequestHandler.Response<String>)
+                               
scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, 
request).get();
+
+               assertEquals(payload, response.getPayload());
+       }
+
+       @Test
+       public void testDeliveringClientRequestToNonRequestHandler() throws 
Exception {
+               final OperatorCoordinator.Provider provider = new 
TestingOperatorCoordinator.Provider(testOperatorId);
+               final DefaultScheduler scheduler = createScheduler(provider);
+
+               final String payload = "testing payload";
+               final TestingCoordinationRequestHandler.Request<String> request 
=
+                       new 
TestingCoordinationRequestHandler.Request<>(payload);
+
+               CommonTestUtils.assertThrows(
+                       "cannot handle client event",
+                       FlinkException.class,
+                       () -> 
scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request));
+       }
+
+       @Test
+       public void testDeliveringClientRequestToNonExistingCoordinator() 
throws Exception {
+               final OperatorCoordinator.Provider provider = new 
TestingOperatorCoordinator.Provider(testOperatorId);
+               final DefaultScheduler scheduler = createScheduler(provider);
+
+               final String payload = "testing payload";
+               final TestingCoordinationRequestHandler.Request<String> request 
=
+                       new 
TestingCoordinationRequestHandler.Request<>(payload);
+
+               CommonTestUtils.assertThrows(
+                       "does not exist",
+                       FlinkException.class,
+                       () -> 
scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), request));
+       }
+
        // 
------------------------------------------------------------------------
        //  test setups
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
new file mode 100644
index 0000000..4aa3c43
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A simple testing implementation of the {@link CoordinationRequestHandler}.
+ */
+public class TestingCoordinationRequestHandler extends 
TestingOperatorCoordinator implements CoordinationRequestHandler {
+
+       public TestingCoordinationRequestHandler(Context context) {
+               super(context);
+       }
+
+       @Override
+       public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request) {
+               Request req = (Request) request;
+               return CompletableFuture.completedFuture(new 
Response<>(req.getPayload()));
+       }
+
+       /**
+        * A testing stub for an {@link OperatorCoordinator.Provider} that 
creates a
+        * {@link TestingCoordinationRequestHandler}.
+        */
+       public static final class Provider implements 
OperatorCoordinator.Provider {
+
+               private static final long serialVersionUID = 1L;
+
+               private final OperatorID operatorId;
+
+               public Provider(OperatorID operatorId) {
+                       this.operatorId = operatorId;
+               }
+
+               @Override
+               public OperatorID getOperatorId() {
+                       return operatorId;
+               }
+
+               @Override
+               public OperatorCoordinator create(Context context) {
+                       return new TestingCoordinationRequestHandler(context);
+               }
+       }
+
+       /**
+        * A {@link CoordinationRequest} that a {@link 
TestingCoordinationRequestHandler} receives.
+        *
+        * @param <T> payload type
+        */
+       public static class Request<T> implements CoordinationRequest {
+
+               private static final long serialVersionUID = 1L;
+
+               private final T payload;
+
+               public Request(T payload) {
+                       this.payload = payload;
+               }
+
+               public T getPayload() {
+                       return payload;
+               }
+       }
+
+       /**
+        * A {@link CoordinationResponse} that a {@link 
TestingCoordinationRequestHandler} gives.
+        *
+        * @param <T> payload type
+        */
+       public static class Response<T> implements CoordinationResponse {
+
+               private static final long serialVersionUID = 1L;
+
+               private final T payload;
+
+               public Response(T payload) {
+                       this.payload = payload;
+               }
+
+               public T getPayload() {
+                       return payload;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index 4afd56d..becf513 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -29,12 +29,17 @@ import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -92,7 +97,8 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway implem
                        DispatcherId fencingToken,
                        Function<JobID, 
CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction,
                        Supplier<CompletableFuture<Acknowledge>> 
clusterShutdownSupplier,
-                       Function<ApplicationStatus, 
CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction) {
+                       Function<ApplicationStatus, 
CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction,
+                       TriFunction<JobID, OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestToCoordinatorFunction) {
                super(
                        address,
                        hostname,
@@ -107,7 +113,8 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway implem
                        requestOperatorBackPressureStatsFunction,
                        triggerSavepointFunction,
                        stopWithSavepointFunction,
-                       clusterShutdownSupplier);
+                       clusterShutdownSupplier,
+                       deliverCoordinationRequestToCoordinatorFunction);
                this.submitFunction = submitFunction;
                this.listFunction = listFunction;
                this.blobServerPort = blobServerPort;
@@ -219,7 +226,8 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway implem
                                fencingToken,
                                requestArchivedJobFunction,
                                clusterShutdownSupplier,
-                               clusterShutdownWithStatusFunction);
+                               clusterShutdownWithStatusFunction,
+                               
deliverCoordinationRequestToCoordinatorFunction);
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index b7cf02d..0acc5ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -26,12 +26,17 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -57,6 +62,7 @@ public class TestingRestfulGateway implements RestfulGateway {
        static final BiFunction<JobID, JobVertexID, 
CompletableFuture<OperatorBackPressureStatsResponse>> 
DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER = (jobId, jobVertexId) -> 
FutureUtils.completedExceptionally(new UnsupportedOperationException());
        static final BiFunction<JobID, String, CompletableFuture<String>> 
DEFAULT_TRIGGER_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> 
FutureUtils.completedExceptionally(new UnsupportedOperationException());
        static final BiFunction<JobID, String, CompletableFuture<String>> 
DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> 
FutureUtils.completedExceptionally(new UnsupportedOperationException());
+       static final TriFunction<JobID, OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION = (JobID jobId, 
OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest) 
-> FutureUtils.completedExceptionally(new UnsupportedOperationException());
        static final String LOCALHOST = "localhost";
 
        protected String address;
@@ -89,6 +95,8 @@ public class TestingRestfulGateway implements RestfulGateway {
 
        protected BiFunction<JobID, String, CompletableFuture<String>> 
stopWithSavepointFunction;
 
+       protected TriFunction<JobID, OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestToCoordinatorFunction;
+
        public TestingRestfulGateway() {
                this(
                        LOCALHOST,
@@ -104,7 +112,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                        DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER,
                        DEFAULT_TRIGGER_SAVEPOINT_FUNCTION,
                        DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION,
-                       DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER);
+                       DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER,
+                       
DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION);
        }
 
        public TestingRestfulGateway(
@@ -121,7 +130,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                        BiFunction<JobID, JobVertexID, 
CompletableFuture<OperatorBackPressureStatsResponse>> 
requestOperatorBackPressureStatsFunction,
                        BiFunction<JobID, String, CompletableFuture<String>> 
triggerSavepointFunction,
                        BiFunction<JobID, String, CompletableFuture<String>> 
stopWithSavepointFunction,
-                       Supplier<CompletableFuture<Acknowledge>> 
clusterShutdownSupplier) {
+                       Supplier<CompletableFuture<Acknowledge>> 
clusterShutdownSupplier,
+                       TriFunction<JobID, OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestToCoordinatorFunction) {
                this.address = address;
                this.hostname = hostname;
                this.cancelJobFunction = cancelJobFunction;
@@ -136,6 +146,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                this.triggerSavepointFunction = triggerSavepointFunction;
                this.stopWithSavepointFunction = stopWithSavepointFunction;
                this.clusterShutdownSupplier = clusterShutdownSupplier;
+               this.deliverCoordinationRequestToCoordinatorFunction = 
deliverCoordinationRequestToCoordinatorFunction;
        }
 
        @Override
@@ -199,6 +210,15 @@ public class TestingRestfulGateway implements 
RestfulGateway {
        }
 
        @Override
+       public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       SerializedValue<CoordinationRequest> serializedRequest,
+                       Time timeout) {
+               return 
deliverCoordinationRequestToCoordinatorFunction.apply(jobId, operatorId, 
serializedRequest);
+       }
+
+       @Override
        public String getAddress() {
                return address;
        }
@@ -229,6 +249,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                protected BiFunction<JobID, JobVertexID, 
CompletableFuture<OperatorBackPressureStatsResponse>> 
requestOperatorBackPressureStatsFunction;
                protected BiFunction<JobID, String, CompletableFuture<String>> 
triggerSavepointFunction;
                protected BiFunction<JobID, String, CompletableFuture<String>> 
stopWithSavepointFunction;
+               protected TriFunction<JobID, OperatorID, 
SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestToCoordinatorFunction;
 
                protected AbstractBuilder() {
                        cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
@@ -243,6 +264,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                        triggerSavepointFunction = 
DEFAULT_TRIGGER_SAVEPOINT_FUNCTION;
                        stopWithSavepointFunction = 
DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION;
                        clusterShutdownSupplier = 
DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER;
+                       deliverCoordinationRequestToCoordinatorFunction = 
DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION;
                }
 
                public T setAddress(String address) {
@@ -315,6 +337,11 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                        return self();
                }
 
+               public T 
setDeliverCoordinationRequestToCoordinatorFunction(TriFunction<JobID, 
OperatorID, SerializedValue<CoordinationRequest>, 
CompletableFuture<CoordinationResponse>> 
deliverCoordinationRequestToCoordinatorFunction) {
+                       this.deliverCoordinationRequestToCoordinatorFunction = 
deliverCoordinationRequestToCoordinatorFunction;
+                       return self();
+               }
+
                protected abstract T self();
 
                public abstract TestingRestfulGateway build();
@@ -346,7 +373,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                                requestOperatorBackPressureStatsFunction,
                                triggerSavepointFunction,
                                stopWithSavepointFunction,
-                               clusterShutdownSupplier);
+                               clusterShutdownSupplier,
+                               
deliverCoordinationRequestToCoordinatorFunction);
                }
        }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 00bc2b9..e548f5e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -31,10 +31,13 @@ import 
org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.JobResult.Builder;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -274,6 +277,14 @@ public class RemoteStreamEnvironmentTest extends 
TestLogger {
                                @Nullable String savepointDirectory) {
                        return null;
                }
+
+               @Override
+               public CompletableFuture<CoordinationResponse> 
sendCoordinationRequest(
+                               JobID jobId,
+                               OperatorID operatorId,
+                               CoordinationRequest request) {
+                       return null;
+               }
        }
 
 }

Reply via email to