This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8650e40   [FLINK-17735][streaming] Add an iterator to collect sink 
results through coordination rest api
8650e40 is described below

commit 8650e409763a9b4eba92c77cc02ca22edc0230af
Author: TsReaper <tsreape...@gmail.com>
AuthorDate: Sun May 17 21:47:31 2020 +0800

     [FLINK-17735][streaming] Add an iterator to collect sink results through 
coordination rest api
    
    This closes #12073
---
 .../operators/collect/CollectResultFetcher.java    | 334 +++++++++++++++++++++
 .../operators/collect/CollectResultIterator.java   |  95 ++++++
 .../api/operators/collect/CollectSinkFunction.java |  33 +-
 .../collect/CollectSinkOperatorCoordinator.java    |  22 +-
 .../collect/CollectSinkOperatorFactory.java        |  21 +-
 .../collect/CollectResultIteratorTest.java         | 132 ++++++++
 .../operators/collect/CollectSinkFunctionTest.java | 206 ++++++++-----
 .../CollectSinkOperatorCoordinatorTest.java        |   6 +-
 .../collect/utils/CollectRequestSender.java        |  31 --
 .../operators/collect/utils/TestCollectClient.java | 141 ---------
 .../utils/TestCoordinationRequestHandler.java      | 213 +++++++++++++
 .../api/operators/collect/utils/TestJobClient.java | 133 ++++++++
 12 files changed, 1095 insertions(+), 272 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
new file mode 100644
index 0000000..ec7286b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A fetcher which fetches query results from sink and provides exactly-once 
semantics.
+ */
+public class CollectResultFetcher<T> {
+
+       private static final int DEFAULT_RETRY_MILLIS = 100;
+       private static final long DEFAULT_ACCUMULATOR_GET_MILLIS = 10000;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CollectResultFetcher.class);
+
+       private final CompletableFuture<OperatorID> operatorIdFuture;
+       private final String accumulatorName;
+       private final int retryMillis;
+
+       private ResultBuffer buffer;
+
+       @Nullable
+       private JobClient jobClient;
+       @Nullable
+       private CoordinationRequestGateway gateway;
+
+       private boolean jobTerminated;
+       private boolean closed;
+
+       public CollectResultFetcher(
+                       CompletableFuture<OperatorID> operatorIdFuture,
+                       TypeSerializer<T> serializer,
+                       String accumulatorName) {
+               this(
+                       operatorIdFuture,
+                       serializer,
+                       accumulatorName,
+                       DEFAULT_RETRY_MILLIS);
+       }
+
+       CollectResultFetcher(
+                       CompletableFuture<OperatorID> operatorIdFuture,
+                       TypeSerializer<T> serializer,
+                       String accumulatorName,
+                       int retryMillis) {
+               this.operatorIdFuture = operatorIdFuture;
+               this.accumulatorName = accumulatorName;
+               this.retryMillis = retryMillis;
+
+               this.buffer = new ResultBuffer(serializer);
+
+               this.jobTerminated = false;
+               this.closed = false;
+       }
+
+       public void setJobClient(JobClient jobClient) {
+               Preconditions.checkArgument(
+                       jobClient instanceof CoordinationRequestGateway,
+                       "Job client must be a CoordinationRequestGateway. This 
is a bug.");
+               this.jobClient = jobClient;
+               this.gateway = (CoordinationRequestGateway) jobClient;
+       }
+
+       public T next() throws IOException {
+               if (closed) {
+                       return null;
+               }
+
+               // this is to avoid sleeping before first try
+               boolean beforeFirstTry = true;
+               do {
+                       T res = buffer.next();
+                       if (res != null) {
+                               // we still have user-visible results, just use 
them
+                               return res;
+                       } else if (jobTerminated) {
+                               // no user-visible results, but job has 
terminated, we have to return
+                               return null;
+                       } else if (!beforeFirstTry) {
+                               // no results but job is still running, sleep 
before retry
+                               sleepBeforeRetry();
+                       }
+                       beforeFirstTry = false;
+
+                       if (isJobTerminated()) {
+                               // job terminated, read results from accumulator
+                               jobTerminated = true;
+                               Tuple2<Long, CollectCoordinationResponse<T>> 
accResults = getAccumulatorResults();
+                               buffer.dealWithResponse(accResults.f1, 
accResults.f0);
+                               buffer.complete();
+                       } else {
+                               // job still running, try to fetch some results
+                               long requestOffset = buffer.offset;
+                               CollectCoordinationResponse<T> response;
+                               try {
+                                       response = sendRequest(buffer.version, 
requestOffset);
+                               } catch (Exception e) {
+                                       LOG.warn("An exception occurs when 
fetching query results", e);
+                                       continue;
+                               }
+                               // the response will contain data (if any) 
starting exactly from requested offset
+                               buffer.dealWithResponse(response, 
requestOffset);
+                       }
+               } while (true);
+       }
+
+       public void close() {
+               if (closed) {
+                       return;
+               }
+
+               cancelJob();
+               closed = true;
+       }
+
+       @SuppressWarnings("unchecked")
+       private CollectCoordinationResponse<T> sendRequest(
+                       String version,
+                       long offset) throws InterruptedException, 
ExecutionException {
+               checkJobClientConfigured();
+
+               OperatorID operatorId = operatorIdFuture.getNow(null);
+               Preconditions.checkNotNull(operatorId, "Unknown operator ID. 
This is a bug.");
+
+               CollectCoordinationRequest request = new 
CollectCoordinationRequest(version, offset);
+               return (CollectCoordinationResponse<T>) 
gateway.sendCoordinationRequest(operatorId, request).get();
+       }
+
+       private Tuple2<Long, CollectCoordinationResponse<T>> 
getAccumulatorResults() throws IOException {
+               checkJobClientConfigured();
+
+               JobExecutionResult executionResult;
+               try {
+                       // this timeout is sort of hack, see comments in 
isJobTerminated for explanation
+                       executionResult = 
jobClient.getJobExecutionResult(getClass().getClassLoader()).get(
+                               DEFAULT_ACCUMULATOR_GET_MILLIS, 
TimeUnit.MILLISECONDS);
+               } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                       throw new IOException("Failed to fetch job execution 
result", e);
+               }
+
+               ArrayList<byte[]> accResults = 
executionResult.getAccumulatorResult(accumulatorName);
+               if (accResults == null) {
+                       // job terminates abnormally
+                       throw new IOException("Job terminated abnormally, no 
job execution result can be fetched");
+               }
+
+               try {
+                       List<byte[]> serializedResults =
+                               
SerializedListAccumulator.deserializeList(accResults, 
BytePrimitiveArraySerializer.INSTANCE);
+                       byte[] serializedResult = serializedResults.get(0);
+                       return 
CollectSinkFunction.deserializeAccumulatorResult(serializedResult);
+               } catch (ClassNotFoundException | IOException e) {
+                       // this is impossible
+                       throw new IOException("Failed to deserialize 
accumulator results", e);
+               }
+       }
+
+       private boolean isJobTerminated() {
+               checkJobClientConfigured();
+
+               try {
+                       JobStatus status = jobClient.getJobStatus().get();
+                       return status.isGloballyTerminalState();
+               } catch (Exception e) {
+                       // TODO
+                       //  This is sort of hack.
+                       //  Currently different execution environment will have 
different behaviors
+                       //  when fetching a finished job status.
+                       //  For example, standalone session cluster will return 
a normal FINISHED,
+                       //  while mini cluster will throw IllegalStateException,
+                       //  and yarn per job will throw 
ApplicationNotFoundException.
+                       //  We have to assume that job has finished in this 
case.
+                       //  Change this when these behaviors are unified.
+                       LOG.warn("Failed to get job status so we assume that 
the job has terminated. Some data might be lost.", e);
+                       return true;
+               }
+       }
+
+       private void cancelJob() {
+               checkJobClientConfigured();
+
+               if (!isJobTerminated()) {
+                       jobClient.cancel();
+               }
+       }
+
+       private void sleepBeforeRetry() {
+               if (retryMillis <= 0) {
+                       return;
+               }
+
+               try {
+                       // TODO a more proper retry strategy?
+                       Thread.sleep(retryMillis);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interrupted when sleeping before a retry", e);
+               }
+       }
+
+       private void checkJobClientConfigured() {
+               Preconditions.checkNotNull(jobClient, "Job client must be 
configured before first use.");
+               Preconditions.checkNotNull(gateway, "Coordination request 
gateway must be configured before first use.");
+       }
+
+       /**
+        * A buffer which encapsulates the logic of dealing with the response 
from the {@link CollectSinkFunction}.
+        * See Java doc of {@link CollectSinkFunction} for explanation of this 
communication protocol.
+        */
+       private class ResultBuffer {
+
+               private static final String INIT_VERSION = "";
+
+               private final LinkedList<T> buffer;
+               private final TypeSerializer<T> serializer;
+
+               // for detailed explanation of the following 3 variables, see 
Java doc of CollectSinkFunction
+               // `version` is to check if the sink restarts
+               private String version;
+               // `offset` is the offset of the next result we want to fetch
+               private long offset;
+
+               // userVisibleHead <= user visible results offset < 
userVisibleTail
+               private long userVisibleHead;
+               private long userVisibleTail;
+
+               private ResultBuffer(TypeSerializer<T> serializer) {
+                       this.buffer = new LinkedList<>();
+                       this.serializer = serializer;
+
+                       this.version = INIT_VERSION;
+                       this.offset = 0;
+
+                       this.userVisibleHead = 0;
+                       this.userVisibleTail = 0;
+               }
+
+               private T next() {
+                       if (userVisibleHead == userVisibleTail) {
+                               return null;
+                       }
+                       T ret = buffer.removeFirst();
+                       userVisibleHead++;
+
+                       sanityCheck();
+                       return ret;
+               }
+
+               private void dealWithResponse(CollectCoordinationResponse<T> 
response, long responseOffset) throws IOException {
+                       String responseVersion = response.getVersion();
+                       long responseLastCheckpointedOffset = 
response.getLastCheckpointedOffset();
+                       List<T> results = response.getResults(serializer);
+
+                       // we first check version in the response to decide 
whether we should throw away dirty results
+                       if (!version.equals(responseVersion)) {
+                               // sink restarted, we revert back to where the 
sink tells us
+                               for (long i = 0; i < offset - 
responseLastCheckpointedOffset; i++) {
+                                       buffer.removeLast();
+                               }
+                               version = responseVersion;
+                               offset = responseLastCheckpointedOffset;
+                       }
+
+                       // we now check if more results can be seen by the user
+                       if (responseLastCheckpointedOffset > userVisibleTail) {
+                               // lastCheckpointedOffset increases, this means 
that more results have been
+                               // checkpointed, and we can give these results 
to the user
+                               userVisibleTail = 
responseLastCheckpointedOffset;
+                       }
+
+                       if (!results.isEmpty()) {
+                               // response contains some data, add them to 
buffer
+                               int addStart = (int) (offset - responseOffset);
+                               List<T> addedResults = 
results.subList(addStart, results.size());
+                               buffer.addAll(addedResults);
+                               offset += addedResults.size();
+                       }
+
+                       sanityCheck();
+               }
+
+               private void complete() {
+                       userVisibleTail = offset;
+               }
+
+               private void sanityCheck() {
+                       Preconditions.checkState(
+                               userVisibleHead <= userVisibleTail,
+                               "userVisibleHead should not be larger than 
userVisibleTail. This is a bug.");
+                       Preconditions.checkState(
+                               userVisibleTail <= offset,
+                               "userVisibleTail should not be larger than 
offset. This is a bug.");
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
new file mode 100644
index 0000000..bbf41e1
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An iterator which iterates through the results of a query job.
+ *
+ * <p>NOTE: After using this iterator, the close method MUST be called in 
order to release job related resources.
+ */
+public class CollectResultIterator<T> implements Iterator<T>, AutoCloseable {
+
+       private final CollectResultFetcher<T> fetcher;
+       private T bufferedResult;
+
+       public CollectResultIterator(
+                       CompletableFuture<OperatorID> operatorIdFuture,
+                       TypeSerializer<T> serializer,
+                       String accumulatorName) {
+               this.fetcher = new CollectResultFetcher<>(operatorIdFuture, 
serializer, accumulatorName);
+               this.bufferedResult = null;
+       }
+
+       @VisibleForTesting
+       public CollectResultIterator(
+                       CompletableFuture<OperatorID> operatorIdFuture,
+                       TypeSerializer<T> serializer,
+                       String accumulatorName,
+                       int retryMillis) {
+               this.fetcher = new CollectResultFetcher<>(operatorIdFuture, 
serializer, accumulatorName, retryMillis);
+               this.bufferedResult = null;
+       }
+
+       @Override
+       public boolean hasNext() {
+               // we have to make sure that the next result exists
+               // it is possible that there is no more result but the job is 
still running
+               if (bufferedResult == null) {
+                       bufferedResult = nextResultFromFetcher();
+               }
+               return bufferedResult != null;
+       }
+
+       @Override
+       public T next() {
+               if (bufferedResult == null) {
+                       bufferedResult = nextResultFromFetcher();
+               }
+               T ret = bufferedResult;
+               bufferedResult = null;
+               return ret;
+       }
+
+       @Override
+       public void close() throws Exception {
+               fetcher.close();
+       }
+
+       public void setJobClient(JobClient jobClient) {
+               fetcher.setJobClient(jobClient);
+       }
+
+       private T nextResultFromFetcher() {
+               try {
+                       return fetcher.next();
+               } catch (IOException e) {
+                       fetcher.close();
+                       throw new RuntimeException("Failed to fetch next 
result", e);
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
index 21e889b..64a3ffe 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.operators.collect;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
@@ -78,7 +79,9 @@ import java.util.concurrent.locks.ReentrantLock;
  *         before this offset. Sink can safely throw these results away.</li>
  *     <li><strong>lastCheckpointedOffset</strong>:
  *         This is the value of <code>offset</code> when the checkpoint 
happens. This value will be
- *         restored from the checkpoint and set back to <code>offset</code> 
when the sink restarts.</li>
+ *         restored from the checkpoint and set back to <code>offset</code> 
when the sink restarts.
+ *         Clients who need exactly-once semantics need to rely on this value 
for the position to
+ *         revert when a failover happens.</li>
  * </ol>
  *
  * <p>Client will put <code>version</code> and <code>offset</code> into the 
request, indicating that
@@ -97,9 +100,9 @@ import java.util.concurrent.locks.ReentrantLock;
  * <ol>
  *     <li>If the version mismatches, client knows that sink has restarted. It 
will throw away all uncheckpointed
  *         results after <code>lastCheckpointedOffset</code>.</li>
- *     <li>Otherwise the version matches. If 
<code>lastCheckpointedOffset</code> increases, client knows that
- *         a checkpoint happens. It can now move all results before this 
offset to a user-visible buffer. If
- *         the response also contains new results, client will now move these 
new results into uncheckpointed
+ *     <li>If <code>lastCheckpointedOffset</code> increases, client knows that
+ *         a checkpoint happens. It can now move all results before this 
offset to a user-visible buffer.</li>
+ *     <li>If the response also contains new results, client will now move 
these new results into uncheckpointed
  *         buffer.</li>
  * </ol>
  *
@@ -264,7 +267,9 @@ public class CollectSinkFunction<IN> extends 
RichSinkFunction<IN> implements Che
                        // put results not consumed by the client into the 
accumulator
                        // so that we do not block the closing procedure while 
not throwing results away
                        SerializedListAccumulator<byte[]> accumulator = new 
SerializedListAccumulator<>();
-                       accumulator.add(serializeAccumulatorResult(), 
BytePrimitiveArraySerializer.INSTANCE);
+                       accumulator.add(
+                               serializeAccumulatorResult(offset, version, 
lastCheckpointedOffset, bufferedResults, serializer),
+                               BytePrimitiveArraySerializer.INSTANCE);
                        getRuntimeContext().addAccumulator(accumulatorName, 
accumulator);
                } finally {
                        bufferedResultsLock.unlock();
@@ -286,22 +291,28 @@ public class CollectSinkFunction<IN> extends 
RichSinkFunction<IN> implements Che
                this.eventGateway = eventGateway;
        }
 
-       private byte[] serializeAccumulatorResult() throws IOException {
+       @VisibleForTesting
+       public static <T> byte[] serializeAccumulatorResult(
+                       long offset,
+                       String version,
+                       long lastCheckpointedOffset,
+                       List<T> bufferedResults,
+                       TypeSerializer<T> serializer) throws IOException {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
                wrapper.writeLong(offset);
-               CollectCoordinationResponse<IN> finalResponse =
+               CollectCoordinationResponse<T> finalResponse =
                        new CollectCoordinationResponse<>(version, 
lastCheckpointedOffset, bufferedResults, serializer);
                finalResponse.serialize(wrapper);
                return baos.toByteArray();
        }
 
-       public static Tuple2<Long, CollectCoordinationResponse> 
deserializeAccumulatorResult(
+       public static <T> Tuple2<Long, CollectCoordinationResponse<T>> 
deserializeAccumulatorResult(
                        byte[] serializedAccResults) throws IOException {
                ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedAccResults);
                DataInputViewStreamWrapper wrapper = new 
DataInputViewStreamWrapper(bais);
                long token = wrapper.readLong();
-               CollectCoordinationResponse finalResponse = new 
CollectCoordinationResponse(wrapper);
+               CollectCoordinationResponse<T> finalResponse = new 
CollectCoordinationResponse<>(wrapper);
                return Tuple2.of(token, finalResponse);
        }
 
@@ -395,8 +406,8 @@ public class CollectSinkFunction<IN> extends 
RichSinkFunction<IN> implements Che
 
                private void close() {
                        running = false;
+                       closeServerSocket();
                        closeCurrentConnection();
-                       closeServer();
                }
 
                private InetSocketAddress getServerSocketAddress() {
@@ -444,7 +455,7 @@ public class CollectSinkFunction<IN> extends 
RichSinkFunction<IN> implements Che
                        }
                }
 
-               private void closeServer() {
+               private void closeServerSocket() {
                        try {
                                serverSocket.close();
                        } catch (Exception e) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index dea9bf3..6c84266 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -56,6 +56,8 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
 
        private static final Logger LOG = 
LoggerFactory.getLogger(CollectSinkOperatorCoordinator.class);
 
+       private final int socketTimeout;
+
        private InetSocketAddress address;
        private Socket socket;
        private DataInputViewStreamWrapper inStream;
@@ -63,6 +65,10 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
 
        private ExecutorService executorService;
 
+       public CollectSinkOperatorCoordinator(int socketTimeout) {
+               this.socketTimeout = socketTimeout;
+       }
+
        @Override
        public void start() throws Exception {
                this.executorService =
@@ -107,12 +113,20 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
                        CollectCoordinationRequest request,
                        CompletableFuture<CoordinationResponse> responseFuture,
                        InetSocketAddress sinkAddress) {
+               if (sinkAddress == null) {
+                       closeConnection();
+                       completeWithEmptyResponse(request, responseFuture);
+                       return;
+               }
+
                try {
                        if (socket == null) {
-                               socket = new Socket(sinkAddress.getAddress(), 
sinkAddress.getPort());
+                               socket = new Socket();
+                               socket.setSoTimeout(socketTimeout);
                                socket.setKeepAlive(true);
                                socket.setTcpNoDelay(true);
 
+                               socket.connect(sinkAddress);
                                inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
                                outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
                                LOG.info("Sink connection established");
@@ -200,9 +214,11 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
        public static class Provider implements OperatorCoordinator.Provider {
 
                private final OperatorID operatorId;
+               private final int socketTimeout;
 
-               public Provider(OperatorID operatorId) {
+               public Provider(OperatorID operatorId, int socketTimeout) {
                        this.operatorId = operatorId;
+                       this.socketTimeout = socketTimeout;
                }
 
                @Override
@@ -213,7 +229,7 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
                @Override
                public OperatorCoordinator create(Context context) {
                        // we do not send operator event so we don't need a 
context
-                       return new CollectSinkOperatorCoordinator();
+                       return new 
CollectSinkOperatorCoordinator(socketTimeout);
                }
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
index 8fa9843..e177f79 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.operators.collect;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
@@ -28,19 +29,25 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 /**
  * The Factory class for {@link CollectSinkOperator}.
  */
-@SuppressWarnings("unchecked")
-public class CollectSinkOperatorFactory extends 
SimpleUdfStreamOperatorFactory<Object> implements 
CoordinatedOperatorFactory<Object> {
+public class CollectSinkOperatorFactory<IN> extends 
SimpleUdfStreamOperatorFactory<Object> implements 
CoordinatedOperatorFactory<Object> {
 
        private static final long serialVersionUID = 1L;
 
-       private final CollectSinkOperator<?> operator;
+       private final CollectSinkOperator<IN> operator;
+       private final int socketTimeout;
 
-       public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) {
-               super(operator);
-               this.operator = operator;
+       public CollectSinkOperatorFactory(
+                       TypeSerializer<IN> serializer,
+                       int maxResultsPerBatch,
+                       String accumulatorName,
+                       int socketTimeout) {
+               super(new CollectSinkOperator<>(serializer, maxResultsPerBatch, 
accumulatorName));
+               this.operator = (CollectSinkOperator<IN>) getOperator();
+               this.socketTimeout = socketTimeout;
        }
 
        @Override
+       @SuppressWarnings("unchecked")
        public <T extends StreamOperator<Object>> T  
createStreamOperator(StreamOperatorParameters<Object> parameters) {
                final OperatorID operatorId = 
parameters.getStreamConfig().getOperatorID();
                final OperatorEventDispatcher eventDispatcher = 
parameters.getOperatorEventDispatcher();
@@ -55,6 +62,6 @@ public class CollectSinkOperatorFactory extends 
SimpleUdfStreamOperatorFactory<O
        @Override
        public OperatorCoordinator.Provider getCoordinatorProvider(String 
operatorName, OperatorID operatorID) {
                operator.getOperatorIdFuture().complete(operatorID);
-               return new CollectSinkOperatorCoordinator.Provider(operatorID);
+               return new CollectSinkOperatorCoordinator.Provider(operatorID, 
socketTimeout);
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
new file mode 100644
index 0000000..021689f
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.streaming.api.operators.collect.utils.TestCoordinationRequestHandler;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for {@link CollectResultIterator}.
+ */
+public class CollectResultIteratorTest extends TestLogger {
+
+       private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
+       private static final JobID TEST_JOB_ID = new JobID();
+       private static final String ACCUMULATOR_NAME = "accumulatorName";
+
+       @Test
+       public void testIteratorWithCheckpointAndFailure() throws Exception {
+               // run this random test multiple times
+               for (int testCount = 1000; testCount > 0; testCount--) {
+                       List<Integer> expected = new ArrayList<>();
+                       for (int i = 0; i < 200; i++) {
+                               expected.add(i);
+                       }
+
+                       CollectResultIterator<Integer> iterator = 
createIteratorAndJobClient(expected, IntSerializer.INSTANCE).f0;
+
+                       List<Integer> actual = new ArrayList<>();
+                       while (iterator.hasNext()) {
+                               actual.add(iterator.next());
+                       }
+                       Assert.assertEquals(expected.size(), actual.size());
+
+                       Collections.sort(expected);
+                       Collections.sort(actual);
+                       Assert.assertArrayEquals(expected.toArray(new 
Integer[0]), actual.toArray(new Integer[0]));
+
+                       iterator.close();
+               }
+       }
+
+       @Test
+       public void testEarlyClose() throws Exception {
+               List<Integer> expected = new ArrayList<>();
+               for (int i = 0; i < 200; i++) {
+                       expected.add(i);
+               }
+
+               Tuple2<CollectResultIterator<Integer>, JobClient> tuple2 =
+                       createIteratorAndJobClient(expected, 
IntSerializer.INSTANCE);
+               CollectResultIterator<Integer> iterator = tuple2.f0;
+               JobClient jobClient = tuple2.f1;
+
+               for (int i = 0; i < 100; i++) {
+                       Assert.assertTrue(iterator.hasNext());
+                       Assert.assertNotNull(iterator.next());
+               }
+               Assert.assertTrue(iterator.hasNext());
+               iterator.close();
+
+               Assert.assertEquals(JobStatus.CANCELED, 
jobClient.getJobStatus().get());
+       }
+
+       private Tuple2<CollectResultIterator<Integer>, JobClient> 
createIteratorAndJobClient(
+                       List<Integer> expected,
+                       TypeSerializer<Integer> serializer) {
+               CollectResultIterator<Integer> iterator = new 
CollectResultIterator<>(
+                       CompletableFuture.completedFuture(TEST_OPERATOR_ID),
+                       serializer,
+                       ACCUMULATOR_NAME,
+                       0);
+
+               TestCoordinationRequestHandler<Integer> handler = new 
TestCoordinationRequestHandler<>(
+                       expected, serializer, ACCUMULATOR_NAME);
+
+               TestJobClient.JobInfoProvider infoProvider = new 
TestJobClient.JobInfoProvider() {
+
+                       @Override
+                       public boolean isJobFinished() {
+                               return handler.isClosed();
+                       }
+
+                       @Override
+                       public Map<String, OptionalFailure<Object>> 
getAccumulatorResults() {
+                               return handler.getAccumulatorResults();
+                       }
+               };
+
+               TestJobClient jobClient = new TestJobClient(
+                       TEST_JOB_ID,
+                       TEST_OPERATOR_ID,
+                       handler,
+                       infoProvider);
+               iterator.setJobClient(jobClient);
+
+               return Tuple2.of(iterator, jobClient);
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
index 1483998..1363d87 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
@@ -17,25 +17,24 @@
 
 package org.apache.flink.streaming.api.operators.collect;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import 
org.apache.flink.streaming.api.operators.collect.utils.CollectRequestSender;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionInitializationContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
-import 
org.apache.flink.streaming.api.operators.collect.utils.TestCollectClient;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
-import org.apache.flink.types.Row;
+import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -47,11 +46,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -61,13 +62,16 @@ public class CollectSinkFunctionTest extends TestLogger {
 
        private static final int MAX_RESULTS_PER_BATCH = 3;
        private static final String ACCUMULATOR_NAME = 
"tableCollectAccumulator";
-       private static final long TIME_OUT_MILLIS = 10000;
+       private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+       private static final int SOCKET_TIMEOUT_MILLIS = 1000;
        private static final int MAX_RETIRES = 100;
 
-       private static final TypeSerializer<Row> serializer =
-               new 
RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO).createSerializer(new 
ExecutionConfig());
+       private static final JobID TEST_JOB_ID = new JobID();
+       private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
 
-       private CollectSinkFunction<Row> function;
+       private static final TypeSerializer<Integer> serializer = 
IntSerializer.INSTANCE;
+
+       private CollectSinkFunction<Integer> function;
        private CollectSinkOperatorCoordinator coordinator;
        private MockFunctionInitializationContext functionInitializationContext;
        private boolean jobFinished;
@@ -81,7 +85,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                ioManager = new IOManagerAsync();
                runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
ioManager);
                gateway = new MockOperatorEventGateway();
-               coordinator = new CollectSinkOperatorCoordinator();
+               coordinator = new 
CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
                coordinator.start();
 
                // only used in checkpointed tests
@@ -101,10 +105,10 @@ public class CollectSinkFunctionTest extends TestLogger {
                openFunction();
                for (int i = 0; i < 6; i++) {
                        // CollectSinkFunction never use context when invoked
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
-               CollectCoordinationResponse<Row> response = 
sendRequestAndGetValidResponse("", 0);
+               CollectCoordinationResponse<Integer> response = 
sendRequestAndGetValidResponse("", 0);
                Assert.assertEquals(0, response.getLastCheckpointedOffset());
                String version = response.getVersion();
 
@@ -118,7 +122,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 0, 
Collections.emptyList());
 
                for (int i = 6; i < 10; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                // invalid request
@@ -135,7 +139,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 0, 
Collections.emptyList());
 
                for (int i = 10; i < 16; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 12);
@@ -151,10 +155,10 @@ public class CollectSinkFunctionTest extends TestLogger {
                openFunctionWithState();
                for (int i = 0; i < 2; i++) {
                        // CollectSinkFunction never use context when invoked
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
-               CollectCoordinationResponse<Row> response = 
sendRequestAndGetValidResponse("", 0);
+               CollectCoordinationResponse<Integer> response = 
sendRequestAndGetValidResponse("", 0);
                Assert.assertEquals(0, response.getLastCheckpointedOffset());
                String version = response.getVersion();
 
@@ -162,7 +166,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 0, Arrays.asList(0, 1));
 
                for (int i = 2; i < 6; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 3);
@@ -181,7 +185,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 3, Arrays.asList(4, 5));
 
                for (int i = 6; i < 9; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 6);
@@ -192,7 +196,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                openFunctionWithState();
 
                for (int i = 9; i < 12; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 4);
@@ -208,7 +212,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                checkpointFunction(2);
                checkpointComplete(2);
 
-               function.invoke(Row.of(12), null);
+               function.invoke(12, null);
 
                response = sendRequestAndGetValidResponse(version, 7);
                assertResponseEquals(response, version, 6, Arrays.asList(10, 
11, 12));
@@ -228,7 +232,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 6, 
Collections.emptyList());
 
                for (int i = 13; i < 17; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 9);
@@ -249,7 +253,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 9, 
Collections.singletonList(16));
 
                for (int i = 17; i < 20; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 12);
@@ -270,7 +274,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResponseEquals(response, version, 9, 
Collections.singletonList(16));
 
                for (int i = 20; i < 23; i++) {
-                       function.invoke(Row.of(i), null);
+                       function.invoke(i, null);
                }
 
                response = sendRequestAndGetValidResponse(version, 12);
@@ -290,13 +294,9 @@ public class CollectSinkFunctionTest extends TestLogger {
                                expected.add(i);
                        }
                        UncheckpointedDataFeeder feeder = new 
UncheckpointedDataFeeder(expected);
-                       TestCollectClient<Row> client = new TestCollectClient<>(
-                               serializer,
-                               new TestCollectRequestSender(),
-                               () -> jobFinished);
 
-                       runFunctionWithClient(feeder, client);
-                       assertResultsEqualAfterSort(expected, 
client.getResults());
+                       List<Integer> actual = runFunctionRandomTest(feeder);
+                       assertResultsEqualAfterSort(expected, actual);
 
                        after();
                        before();
@@ -312,23 +312,22 @@ public class CollectSinkFunctionTest extends TestLogger {
                                expected.add(i);
                        }
                        CheckpointedDataFeeder feeder = new 
CheckpointedDataFeeder(expected);
-                       TestCollectClient<Row> client = new TestCollectClient<>(
-                               serializer,
-                               new TestCollectRequestSender(),
-                               () -> jobFinished);
 
-                       runFunctionWithClient(feeder, client);
-                       assertResultsEqualAfterSort(expected, 
client.getResults());
+                       List<Integer> actual = runFunctionRandomTest(feeder);
+                       assertResultsEqualAfterSort(expected, actual);
 
                        after();
                        before();
                }
        }
 
-       private void runFunctionWithClient(Thread feeder, Thread client) throws 
Exception {
+       private List<Integer> runFunctionRandomTest(Thread feeder) throws 
Exception {
+               CollectClient client = new CollectClient();
+
                Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
                        feeder.interrupt();
                        client.interrupt();
+                       e.printStackTrace();
                };
                feeder.setUncaughtExceptionHandler(exceptionHandler);
                client.setUncaughtExceptionHandler(exceptionHandler);
@@ -337,10 +336,13 @@ public class CollectSinkFunctionTest extends TestLogger {
                client.start();
                feeder.join();
                client.join();
+
+               return client.results;
        }
 
        private void openFunction() throws Exception {
-               function = new CollectSinkFunction<>(serializer, 
MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
+               function = new CollectSinkFunction<>(
+                       serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
                function.setRuntimeContext(runtimeContext);
                function.setOperatorEventGateway(gateway);
                function.open(new Configuration());
@@ -349,7 +351,8 @@ public class CollectSinkFunctionTest extends TestLogger {
 
        private void openFunctionWithState() throws Exception {
                
functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
-               function = new CollectSinkFunction<>(serializer, 
MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
+               function = new CollectSinkFunction<>(
+                       serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME);
                function.setRuntimeContext(runtimeContext);
                function.setOperatorEventGateway(gateway);
                function.initializeState(functionInitializationContext);
@@ -382,19 +385,19 @@ public class CollectSinkFunctionTest extends TestLogger {
        }
 
        @SuppressWarnings("unchecked")
-       private CollectCoordinationResponse<Row> sendRequest(
+       private CollectCoordinationResponse<Integer> sendRequest(
                        String version,
                        long offset) throws Exception {
                CollectCoordinationRequest request = new 
CollectCoordinationRequest(version, offset);
                // we add a timeout to not block the tests
                return ((CollectCoordinationResponse) coordinator
-                       
.handleCoordinationRequest(request).get(TIME_OUT_MILLIS, 
TimeUnit.MILLISECONDS));
+                       
.handleCoordinationRequest(request).get(FUTURE_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS));
        }
 
-       private CollectCoordinationResponse<Row> sendRequestAndGetValidResponse(
+       private CollectCoordinationResponse<Integer> 
sendRequestAndGetValidResponse(
                        String version,
                        long offset) throws Exception {
-               CollectCoordinationResponse<Row> response;
+               CollectCoordinationResponse<Integer> response;
                for (int i = 0; i < MAX_RETIRES; i++) {
                        response = sendRequest(version, offset);
                        if (response.getLastCheckpointedOffset() >= 0) {
@@ -405,7 +408,7 @@ public class CollectSinkFunctionTest extends TestLogger {
        }
 
        @SuppressWarnings("unchecked")
-       private Tuple2<Long, CollectCoordinationResponse> 
getAccumualtorResults() throws Exception {
+       private Tuple2<Long, CollectCoordinationResponse<Integer>> 
getAccumualtorResults() throws Exception {
                Accumulator accumulator = 
runtimeContext.getAccumulator(ACCUMULATOR_NAME);
                ArrayList<byte[]> accLocalValue = ((SerializedListAccumulator) 
accumulator).getLocalValue();
                List<byte[]> serializedResults =
@@ -416,41 +419,35 @@ public class CollectSinkFunctionTest extends TestLogger {
        }
 
        private void assertResponseEquals(
-                       CollectCoordinationResponse<Row> response,
+                       CollectCoordinationResponse<Integer> response,
                        String version,
                        long lastCheckpointedOffset,
                        List<Integer> expected) throws IOException {
                Assert.assertEquals(version, response.getVersion());
                Assert.assertEquals(lastCheckpointedOffset, 
response.getLastCheckpointedOffset());
-               List<Row> results = response.getResults(serializer);
+               List<Integer> results = response.getResults(serializer);
                assertResultsEqual(expected, results);
        }
 
-       private void assertResultsEqual(List<Integer> expected, List<Row> 
actual) {
-               Assert.assertEquals(expected.size(), actual.size());
-               for (int i = 0; i < expected.size(); i++) {
-                       Row row = actual.get(i);
-                       Assert.assertEquals(1, row.getArity());
-                       Assert.assertEquals(expected.get(i), row.getField(0));
-               }
+       private void assertResultsEqual(List<Integer> expected, List<Integer> 
actual) {
+               Assert.assertArrayEquals(expected.toArray(new Integer[0]), 
actual.toArray(new Integer[0]));
        }
 
-       private void assertResultsEqualAfterSort(List<Integer> expected, 
List<Row> actual) {
+       private void assertResultsEqualAfterSort(List<Integer> expected, 
List<Integer> actual) {
                Collections.sort(expected);
-               actual.sort(Comparator.comparingInt(row -> (int) 
row.getField(0)));
+               Collections.sort(actual);
                assertResultsEqual(expected, actual);
        }
 
-       @SuppressWarnings("unchecked")
        private void assertAccumulatorResult(
                        long expectedOffset,
                        String expectedVersion,
                        long expectedLastCheckpointedOffset,
                        List<Integer> expectedResults) throws Exception {
-               Tuple2<Long, CollectCoordinationResponse> accResults = 
getAccumualtorResults();
+               Tuple2<Long, CollectCoordinationResponse<Integer>> accResults = 
getAccumualtorResults();
                long offset = accResults.f0;
-               CollectCoordinationResponse response = accResults.f1;
-               List<Row> actualResults = response.getResults(serializer);
+               CollectCoordinationResponse<Integer> response = accResults.f1;
+               List<Integer> actualResults = response.getResults(serializer);
 
                Assert.assertEquals(expectedOffset, offset);
                Assert.assertEquals(expectedVersion, response.getVersion());
@@ -458,19 +455,6 @@ public class CollectSinkFunctionTest extends TestLogger {
                assertResultsEqual(expectedResults, actualResults);
        }
 
-       private class TestCollectRequestSender implements 
CollectRequestSender<Row> {
-
-               @Override
-               public CollectCoordinationResponse<Row> sendRequest(String 
version, long offset) throws Exception {
-                       return 
CollectSinkFunctionTest.this.sendRequest(version, offset);
-               }
-
-               @Override
-               public Tuple2<Long, CollectCoordinationResponse> 
getAccumulatorResults() throws Exception {
-                       return 
CollectSinkFunctionTest.this.getAccumualtorResults();
-               }
-       }
-
        /**
         * A thread feeding data to the function. It will fail when half of the 
data is fed.
         */
@@ -496,7 +480,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                                while (data.size() > 0) {
                                        int size = Math.min(data.size(), 
random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1);
                                        for (int i = 0; i < size; i++) {
-                                               
function.invoke(Row.of(data.removeFirst()), null);
+                                               
function.invoke(data.removeFirst(), null);
                                        }
 
                                        if (!failedBefore && data.size() < 
checkpointedData.size() / 2) {
@@ -519,7 +503,6 @@ public class CollectSinkFunctionTest extends TestLogger {
 
                                finishJob();
                        } catch (Exception e) {
-                               e.printStackTrace();
                                throw new RuntimeException(e);
                        }
                }
@@ -571,7 +554,7 @@ public class CollectSinkFunctionTest extends TestLogger {
                                                // with 60% chance we add some 
data
                                                int size = 
Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1);
                                                for (int i = 0; i < size; i++) {
-                                                       
function.invoke(Row.of(data.removeFirst()), null);
+                                                       
function.invoke(data.removeFirst(), null);
                                                }
                                        } else if (r < 9) {
                                                // with 30% chance we make a 
checkpoint
@@ -603,12 +586,14 @@ public class CollectSinkFunctionTest extends TestLogger {
 
                                finishJob();
                        } catch (Exception e) {
-                               e.printStackTrace();
                                throw new RuntimeException(e);
                        }
                }
        }
 
+       /**
+        * Countdown for a checkpoint which will succeed in the future.
+        */
        private static class CheckpointCountdown {
 
                private long id;
@@ -629,4 +614,71 @@ public class CollectSinkFunctionTest extends TestLogger {
                        return false;
                }
        }
+
+       /**
+        * A thread collecting results with the collecting iterator.
+        */
+       private class CollectClient extends Thread {
+
+               private List<Integer> results;
+               private CollectResultIterator<Integer> iterator;
+
+               private CollectClient() {
+                       this.results = new ArrayList<>();
+
+                       this.iterator = new CollectResultIterator<>(
+                               
CompletableFuture.completedFuture(TEST_OPERATOR_ID),
+                               serializer,
+                               ACCUMULATOR_NAME,
+                               0
+                       );
+
+                       TestJobClient.JobInfoProvider infoProvider = new 
TestJobClient.JobInfoProvider() {
+
+                               @Override
+                               public boolean isJobFinished() {
+                                       return jobFinished;
+                               }
+
+                               @Override
+                               public Map<String, OptionalFailure<Object>> 
getAccumulatorResults() {
+                                       Map<String, OptionalFailure<Object>> 
accumulatorResults = new HashMap<>();
+                                       accumulatorResults.put(
+                                               ACCUMULATOR_NAME,
+                                               
OptionalFailure.of(runtimeContext.getAccumulator(ACCUMULATOR_NAME).getLocalValue()));
+                                       return accumulatorResults;
+                               }
+                       };
+
+                       TestJobClient jobClient = new TestJobClient(
+                               TEST_JOB_ID,
+                               TEST_OPERATOR_ID,
+                               coordinator,
+                               infoProvider);
+
+                       iterator.setJobClient(jobClient);
+               }
+
+               @Override
+               public void run() {
+                       Random random = new Random();
+
+                       while (iterator.hasNext()) {
+                               results.add(iterator.next());
+                               if (random.nextBoolean()) {
+                                       try {
+                                               Thread.sleep(5);
+                                       } catch (InterruptedException e) {
+                                               // ignore
+                                       }
+                               }
+                       }
+
+                       try {
+                               iterator.close();
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index d2ad57b..66a624a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -45,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
  */
 public class CollectSinkOperatorCoordinatorTest {
 
+       private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+
        private static final TypeSerializer<Row> serializer = new RowTypeInfo(
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO
@@ -52,7 +54,7 @@ public class CollectSinkOperatorCoordinatorTest {
 
        @Test
        public void testNoAddress() throws Exception {
-               CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator();
+               CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
                coordinator.start();
 
                CollectCoordinationRequest request = new 
CollectCoordinationRequest("version", 123);
@@ -65,7 +67,7 @@ public class CollectSinkOperatorCoordinatorTest {
 
        @Test
        public void testServerFailure() throws Exception {
-               CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator();
+               CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
                coordinator.start();
 
                List<List<Row>> expected = Arrays.asList(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
deleted file mode 100644
index fc7b759..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.collect.utils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
-
-/**
- * Testing interface for sending collect requests.
- */
-public interface CollectRequestSender<T> {
-
-       CollectCoordinationResponse<T> sendRequest(String version, long offset) 
throws Exception;
-
-       Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() 
throws Exception;
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
deleted file mode 100644
index 9040971..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.collect.utils;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.function.BooleanSupplier;
-
-/**
- * A simple client for fetching collect results.
- */
-public class TestCollectClient<T> extends Thread {
-
-       private static final String INIT_VERSION = "";
-       private static final int MAX_RETRY_COUNT = 100;
-
-       private final TypeSerializer<T> serializer;
-       private final CollectRequestSender<T> sender;
-       private final BooleanSupplier jobFinishedChecker;
-
-       private final LinkedList<T> uncheckpointedResults;
-       private final LinkedList<T> checkpointedResults;
-
-       private String version;
-       private long offset;
-       private long lastCheckpointedOffset;
-       private int retryCount;
-
-       public TestCollectClient(
-                       TypeSerializer<T> serializer,
-                       CollectRequestSender<T> sender,
-                       BooleanSupplier jobFinishedChecker) {
-               this.serializer = serializer;
-               this.sender = sender;
-               this.jobFinishedChecker = jobFinishedChecker;
-
-               this.uncheckpointedResults = new LinkedList<>();
-               this.checkpointedResults = new LinkedList<>();
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void run() {
-               Random random = new Random();
-
-               version = INIT_VERSION;
-               offset = 0;
-               lastCheckpointedOffset = 0;
-               retryCount = 0;
-
-               try {
-                       while (!jobFinishedChecker.getAsBoolean()) {
-                               if (random.nextBoolean()) {
-                                       Thread.sleep(random.nextInt(10));
-                               }
-                               CollectCoordinationResponse<T> response = 
sender.sendRequest(version, offset);
-                               dealWithResponse(response, offset);
-                       }
-
-                       Tuple2<Long, CollectCoordinationResponse> accResults = 
sender.getAccumulatorResults();
-                       dealWithResponse(accResults.f1, accResults.f0);
-                       checkpointedResults.addAll(uncheckpointedResults);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       throw new RuntimeException(e);
-               }
-       }
-
-       public List<T> getResults() {
-               return checkpointedResults;
-       }
-
-       private void dealWithResponse(CollectCoordinationResponse<T> response, 
long responseOffset) throws IOException {
-               String responseVersion = response.getVersion();
-               long responseLastCheckpointedOffset = 
response.getLastCheckpointedOffset();
-               List<T> responseResults = response.getResults(serializer);
-
-               if (responseResults.isEmpty()) {
-                       retryCount++;
-               } else {
-                       retryCount = 0;
-               }
-               if (retryCount > MAX_RETRY_COUNT) {
-                       // not to block the tests
-                       throw new RuntimeException("Too many retries in 
TestCollectClient");
-               }
-
-               if (INIT_VERSION.equals(version)) {
-                       // first response, update version accordingly
-                       version = responseVersion;
-               } else {
-                       if (responseLastCheckpointedOffset > 
lastCheckpointedOffset) {
-                               // a new checkpoint happens
-                               int newCheckpointedNum = (int) 
(responseLastCheckpointedOffset - lastCheckpointedOffset);
-                               for (int i = 0; i < newCheckpointedNum; i++) {
-                                       T result = 
uncheckpointedResults.removeFirst();
-                                       checkpointedResults.add(result);
-                               }
-                               lastCheckpointedOffset = 
responseLastCheckpointedOffset;
-                       }
-
-                       if (!version.equals(responseVersion)) {
-                               // sink has restarted
-                               int removeNum = (int) (offset - 
lastCheckpointedOffset);
-                               for (int i = 0; i < removeNum; i++) {
-                                       uncheckpointedResults.removeLast();
-                               }
-                               version = responseVersion;
-                               offset = lastCheckpointedOffset;
-                       }
-
-                       if (responseResults.size() > 0) {
-                               int addStart = (int) (offset - responseOffset);
-                               List<T> resultsToAdd = 
responseResults.subList(addStart, responseResults.size());
-                               uncheckpointedResults.addAll(resultsToAdd);
-                               offset += resultsToAdd.size();
-                       }
-               }
-       }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
new file mode 100644
index 0000000..aecf864
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import 
org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import 
org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.util.OptionalFailure;
+
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link CoordinationRequestHandler} to test fetching SELECT query results.
+ */
+public class TestCoordinationRequestHandler<T> implements 
CoordinationRequestHandler {
+
+       private static final int BATCH_SIZE = 3;
+
+       private final TypeSerializer<T> serializer;
+       private final String accumulatorName;
+
+       private int checkpointCountDown;
+
+       private LinkedList<T> data;
+       private List<T> checkpointingData;
+       private List<T> checkpointedData;
+
+       private LinkedList<T> buffered;
+       private List<T> checkpointingBuffered;
+       private List<T> checkpointedBuffered;
+
+       private String version;
+
+       private long offset;
+       private long checkpointingOffset;
+       private long checkpointedOffset;
+
+       private Map<String, OptionalFailure<Object>> accumulatorResults;
+
+       private Random random;
+       private boolean closed;
+
+       public TestCoordinationRequestHandler(
+                       List<T> data,
+                       TypeSerializer<T> serializer,
+                       String accumulatorName) {
+               this.serializer = serializer;
+               this.accumulatorName = accumulatorName;
+
+               this.checkpointCountDown = 0;
+
+               this.data = new LinkedList<>(data);
+               this.checkpointedData = new ArrayList<>(data);
+
+               this.buffered = new LinkedList<>();
+               this.checkpointedBuffered = new ArrayList<>();
+
+               this.version = UUID.randomUUID().toString();
+
+               this.offset = 0;
+               this.checkpointingOffset = 0;
+               this.checkpointedOffset = 0;
+
+               this.accumulatorResults = new HashMap<>();
+
+               this.random = new Random();
+               this.closed = false;
+       }
+
+       @Override
+       public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request) {
+               if (closed) {
+                       throw new RuntimeException("Handler closed");
+               }
+
+               Assert.assertTrue(request instanceof 
CollectCoordinationRequest);
+               CollectCoordinationRequest collectRequest = 
(CollectCoordinationRequest) request;
+
+               for (int i = random.nextInt(3) + 1; i > 0; i--) {
+                       if (checkpointCountDown > 0) {
+                               checkpointCountDown--;
+                               if (checkpointCountDown == 0) {
+                                       checkpointedData = checkpointingData;
+                                       checkpointedBuffered = 
checkpointingBuffered;
+                                       checkpointedOffset = 
checkpointingOffset;
+                               }
+                       }
+
+                       int r = random.nextInt(10);
+                       if (r < 6) {
+                               // with 60% chance we add data
+                               int size = Math.min(data.size(), BATCH_SIZE * 2 
- buffered.size());
+                               if (size > 0) {
+                                       size = random.nextInt(size) + 1;
+                               }
+                               for (int j = 0; j < size; j++) {
+                                       buffered.add(data.removeFirst());
+                               }
+
+                               if (data.isEmpty()) {
+                                       buildAccumulatorResults();
+                                       closed = true;
+                                       break;
+                               }
+                       } else if (r < 9) {
+                               // with 30% chance we do a checkpoint completed 
in the future
+                               if (checkpointCountDown == 0) {
+                                       checkpointCountDown = random.nextInt(5) 
+ 1;
+                                       checkpointingData = new 
ArrayList<>(data);
+                                       checkpointingBuffered = new 
ArrayList<>(buffered);
+                                       checkpointingOffset = offset;
+                               }
+                       } else {
+                               // with 10% chance we fail
+                               checkpointCountDown = 0;
+                               version = UUID.randomUUID().toString();
+                               data = new LinkedList<>(checkpointedData);
+                               buffered = new 
LinkedList<>(checkpointedBuffered);
+                               offset = checkpointedOffset;
+                       }
+               }
+
+               Assert.assertTrue(offset <= collectRequest.getOffset());
+
+               List<T> subList = Collections.emptyList();
+               if (collectRequest.getVersion().equals(version)) {
+                       while (buffered.size() > 0 && 
collectRequest.getOffset() > offset) {
+                               buffered.removeFirst();
+                               offset++;
+                       }
+                       subList = new ArrayList<>();
+                       Iterator<T> iterator = buffered.iterator();
+                       for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); 
i++) {
+                               subList.add(iterator.next());
+                       }
+               }
+
+               CoordinationResponse response;
+               try {
+                       if (random.nextBoolean()) {
+                               // with 50% chance we return valid result
+                               response = new 
CollectCoordinationResponse<>(version, checkpointedOffset, subList, serializer);
+                       } else {
+                               // with 50% chance we return invalid result
+                               response = new CollectCoordinationResponse<>(
+                                       collectRequest.getVersion(),
+                                       -1,
+                                       Collections.emptyList(),
+                                       serializer);
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+               return CompletableFuture.completedFuture(response);
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       public Map<String, OptionalFailure<Object>> getAccumulatorResults() {
+               return accumulatorResults;
+       }
+
+       private void buildAccumulatorResults() {
+               List<T> finalResults = new ArrayList<>(buffered);
+               SerializedListAccumulator<byte[]> listAccumulator = new 
SerializedListAccumulator<>();
+               try {
+                       byte[] serializedResult =
+                               CollectSinkFunction.serializeAccumulatorResult(
+                                       offset, version, checkpointedOffset, 
finalResults, serializer);
+                       listAccumulator.add(serializedResult, 
BytePrimitiveArraySerializer.INSTANCE);
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+
+               accumulatorResults.put(accumulatorName, 
OptionalFailure.of(listAccumulator.getLocalValue()));
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
new file mode 100644
index 0000000..3a40870
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.OptionalFailure;
+
+import org.junit.Assert;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link JobClient} to test fetching SELECT query results.
+ */
+public class TestJobClient implements JobClient, CoordinationRequestGateway {
+
+       private final JobID jobId;
+       private final OperatorID operatorId;
+       private final CoordinationRequestHandler handler;
+       private final JobInfoProvider infoProvider;
+
+       private JobStatus jobStatus;
+       private JobExecutionResult jobExecutionResult;
+
+       public TestJobClient(
+                       JobID jobId,
+                       OperatorID operatorId,
+                       CoordinationRequestHandler handler,
+                       JobInfoProvider infoProvider) {
+               this.jobId = jobId;
+               this.operatorId = operatorId;
+               this.handler = handler;
+               this.infoProvider = infoProvider;
+
+               this.jobStatus = JobStatus.RUNNING;
+               this.jobExecutionResult = null;
+       }
+
+       @Override
+       public JobID getJobID() {
+               return jobId;
+       }
+
+       @Override
+       public CompletableFuture<JobStatus> getJobStatus() {
+               return CompletableFuture.completedFuture(jobStatus);
+       }
+
+       @Override
+       public CompletableFuture<Void> cancel() {
+               jobStatus = JobStatus.CANCELED;
+               return CompletableFuture.completedFuture(null);
+       }
+
+       @Override
+       public CompletableFuture<String> stopWithSavepoint(boolean 
advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<String> triggerSavepoint(@Nullable String 
savepointDirectory) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Map<String, Object>> 
getAccumulators(ClassLoader classLoader) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(ClassLoader userClassloader) {
+               return CompletableFuture.completedFuture(jobExecutionResult);
+       }
+
+       @Override
+       public CompletableFuture<CoordinationResponse> 
sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+               if (jobStatus.isGloballyTerminalState()) {
+                       throw new RuntimeException("Job terminated");
+               }
+
+               Assert.assertEquals(this.operatorId, operatorId);
+               CoordinationResponse response;
+               try {
+                       response = 
handler.handleCoordinationRequest(request).get();
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               if (infoProvider.isJobFinished()) {
+                       jobStatus = JobStatus.FINISHED;
+                       jobExecutionResult = new JobExecutionResult(jobId, 0, 
infoProvider.getAccumulatorResults());
+               }
+
+               return CompletableFuture.completedFuture(response);
+       }
+
+       /**
+        * Interface to provide job related info for {@link TestJobClient}.
+        */
+       public interface JobInfoProvider {
+
+               boolean isJobFinished();
+
+               Map<String, OptionalFailure<Object>> getAccumulatorResults();
+       }
+}

Reply via email to