Repository: flink
Updated Branches:
  refs/heads/master 035f62969 -> da23ee38e


[hotfix] [contrib] Fix robustness of DataStreamUtils stream collecting


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3e3bd52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3e3bd52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3e3bd52

Branch: refs/heads/master
Commit: d3e3bd528ed972680baebda05acce0bf39af6798
Parents: 77afe28
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 26 20:23:20 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 30 14:41:34 2016 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/CollectSink.java    | 118 ++++--------
 .../contrib/streaming/DataStreamIterator.java   | 134 -------------
 .../contrib/streaming/DataStreamUtils.java      |  65 ++++---
 .../contrib/streaming/SocketStreamIterator.java | 190 +++++++++++++++++++
 .../flink/contrib/streaming/CollectITCase.java  |  48 ++---
 .../streaming/SocketStreamIteratorTest.java     | 104 ++++++++++
 6 files changed, 398 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
index 53b84ef..161eb16 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
@@ -17,35 +17,30 @@
 
 package org.apache.flink.contrib.streaming;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.InetAddress;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * A specialized data sink to be used by DataStreamUtils.collect.
  */
 class CollectSink<IN> extends RichSinkFunction<IN> {
+       
        private static final long serialVersionUID = 1L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(SocketClientSink.class);
-
        private final InetAddress hostIp;
        private final int port;
        private final TypeSerializer<IN> serializer;
+       
        private transient Socket client;
-       private transient DataOutputStream dataOutputStream;
-       private StreamWriterDataOutputView streamWriter;
+       private transient OutputStream outputStream;
+       private transient DataOutputViewStreamWrapper streamWriter;
 
        /**
         * Creates a CollectSink that will send the data to the specified host.
@@ -60,57 +55,13 @@ class CollectSink<IN> extends RichSinkFunction<IN> {
                this.serializer = serializer;
        }
 
-       /**
-        * Initializes the connection to Socket.
-        */
-       public void initializeConnection() {
-               OutputStream outputStream;
-               try {
-                       client = new Socket(hostIp, port);
-                       outputStream = client.getOutputStream();
-                       streamWriter = new 
StreamWriterDataOutputView(outputStream);
-               } catch (IOException e) {
-                       throw new RuntimeException(e);
-               }
-               dataOutputStream = new DataOutputStream(outputStream);
-       }
-
-       /**
-        * Called when new data arrives to the sink, and forwards it to Socket.
-        *
-        * @param value
-        *                      The incoming data
-        */
        @Override
-       public void invoke(IN value) {
+       public void invoke(IN value) throws Exception {
                try {
                        serializer.serialize(value, streamWriter);
-               } catch (IOException e) {
-                       if(LOG.isErrorEnabled()){
-                               LOG.error("Cannot send message to socket server 
at " + hostIp.toString() + ":" + port, e);
-                       }
                }
-       }
-
-       /**
-        * Closes the connection of the Socket client.
-        */
-       private void closeConnection(){
-               try {
-                       dataOutputStream.flush();
-                       client.close();
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while closing 
connection with socket server at "
-                                       + hostIp.toString() + ":" + port, e);
-               } finally {
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (IOException e) {
-                                       LOG.error("Cannot close connection with 
socket server at "
-                                                       + hostIp.toString() + 
":" + port, e);
-                               }
-                       }
+               catch (Exception e) {
+                       throw new IOException("Error sending data back to 
client (" + hostIp.toString() + ":" + port + ')', e);
                }
        }
 
@@ -119,34 +70,47 @@ class CollectSink<IN> extends RichSinkFunction<IN> {
         * @param parameters Configuration.
         */
        @Override
-       public void open(Configuration parameters) {
-               initializeConnection();
+       public void open(Configuration parameters) throws Exception {
+               try {
+                       client = new Socket(hostIp, port);
+                       outputStream = client.getOutputStream();
+                       streamWriter = new 
DataOutputViewStreamWrapper(outputStream);
+               }
+               catch (IOException e) {
+                       throw new IOException("Cannot connect to the client to 
send back the stream", e);
+               }
        }
 
        /**
         * Closes the connection with the Socket server.
         */
        @Override
-       public void close() {
-               closeConnection();
-       }
-
-       private static class StreamWriterDataOutputView extends 
DataOutputStream implements DataOutputView {
-
-               public StreamWriterDataOutputView(OutputStream stream) {
-                       super(stream);
-               }
-
-               public void skipBytesToWrite(int numBytes) throws IOException {
-                       for (int i = 0; i < numBytes; i++) {
-                               write(0);
+       public void close() throws Exception {
+               try {
+                       if (outputStream != null) {
+                               outputStream.flush();
+                               outputStream.close();
+                       }
+                       
+                       // first regular attempt to cleanly close. Failing that 
will escalate
+                       if (client != null) {
+                               client.close();
                        }
                }
-
-               public void write(DataInputView source, int numBytes) throws 
IOException {
-                       byte[] data = new byte[numBytes];
-                       source.readFully(data);
-                       write(data);
+               catch (Exception e) {
+                       throw new IOException("Error while closing connection 
that streams data back to client at "
+                                       + hostIp.toString() + ":" + port, e);
+               }
+               finally {
+                       // if we failed prior to closing the client, close it
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               }
+                               catch (Throwable t) {
+                                       // best effort to close, we do not care 
about an exception here any more
+                               }
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
deleted file mode 100644
index a98740e..0000000
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import java.util.Iterator;
-import java.net.ServerSocket;
-import java.io.InputStream;
-import java.io.IOException;
-import java.io.EOFException;
-import java.util.NoSuchElementException;
-import java.util.concurrent.CountDownLatch;
-import java.io.DataInputStream;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-
-class DataStreamIterator<T> implements Iterator<T> {
-
-       ServerSocket socket;
-       InputStream tcpStream;
-       T next;
-       private final CountDownLatch connectionAccepted = new CountDownLatch(1);
-       private volatile StreamReaderDataInputView streamReader;
-       private final TypeSerializer<T> serializer;
-
-       DataStreamIterator(TypeSerializer serializer) {
-               this.serializer = serializer;
-               try {
-                       socket = new ServerSocket(0, 1, null);
-               } catch (IOException e) {
-                       throw new RuntimeException("DataStreamIterator: an I/O 
error occurred when opening the socket", e);
-               }
-               (new AcceptThread()).start();
-       }
-
-       private class AcceptThread extends Thread {
-               public void run() {
-                       try {
-                               tcpStream = socket.accept().getInputStream();
-                               streamReader = new 
StreamReaderDataInputView(tcpStream);
-                               connectionAccepted.countDown();
-                       } catch (IOException e) {
-                               throw new 
RuntimeException("DataStreamIterator.AcceptThread failed", e);
-                       }
-               }
-       }
-
-       /**
-        * Returns the port on which the iterator is getting the data. (Used 
internally.)
-        * @return The port
-        */
-       public int getPort() {
-               return socket.getLocalPort();
-       }
-
-       /**
-        * Returns true if the DataStream has more elements.
-        * (Note: blocks if there will be more elements, but they are not 
available yet.)
-        * @return true if the DataStream has more elements
-        */
-       @Override
-       public boolean hasNext() {
-               if (next == null) {
-                       readNextFromStream();
-               }
-               return next != null;
-       }
-
-       /**
-        * Returns the next element of the DataStream. (Blocks if it is not 
available yet.)
-        * @return The element
-        * @throws NoSuchElementException if the stream has already ended
-        */
-       @Override
-       public T next() {
-               if (next == null) {
-                       readNextFromStream();
-                       if (next == null) {
-                               throw new NoSuchElementException();
-                       }
-               }
-               T current = next;
-               next = null;
-               return current;
-       }
-
-       private void readNextFromStream(){
-               try {
-                       connectionAccepted.await();
-               } catch (InterruptedException e) {
-                       throw new RuntimeException("The calling thread of 
DataStreamIterator.readNextFromStream was interrupted.");
-               }
-               try {
-                       next = serializer.deserialize(streamReader);
-               } catch (EOFException e) {
-                       next = null;
-               } catch (IOException e) {
-                       throw new RuntimeException("DataStreamIterator could 
not read from deserializedStream", e);
-               }
-       }
-
-       private static class StreamReaderDataInputView extends DataInputStream 
implements DataInputView {
-
-               public StreamReaderDataInputView(InputStream stream) {
-                       super(stream);
-               }
-
-               public void skipBytesToRead(int numBytes) throws IOException {
-                       while (numBytes > 0) {
-                               int skipped = skipBytes(numBytes);
-                               numBytes -= skipped;
-                       }
-               }
-       }
-
-       @Override
-       public void remove() {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
index 6f4e8d9..d4ef9ee 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -17,17 +17,18 @@
 
 package org.apache.flink.contrib.streaming;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.runtime.net.ConnectionUtils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
 
 public final class DataStreamUtils {
 
@@ -35,59 +36,69 @@ public final class DataStreamUtils {
         * Returns an iterator to iterate over the elements of the DataStream.
         * @return The iterator
         */
-       public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
-               TypeSerializer serializer = 
stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
-               DataStreamIterator<OUT> it = new 
DataStreamIterator<OUT>(serializer);
+       public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) 
throws IOException {
+               
+               TypeSerializer<OUT> serializer = 
stream.getType().createSerializer(
+                               stream.getExecutionEnvironment().getConfig());
+               
+               SocketStreamIterator<OUT> iter = new 
SocketStreamIterator<OUT>(serializer);
 
                //Find out what IP of us should be given to CollectSink, that 
it will be able to connect to
                StreamExecutionEnvironment env = 
stream.getExecutionEnvironment();
                InetAddress clientAddress;
-               if(env instanceof RemoteStreamEnvironment) {
+               
+               if (env instanceof RemoteStreamEnvironment) {
                        String host = ((RemoteStreamEnvironment)env).getHost();
                        int port = ((RemoteStreamEnvironment)env).getPort();
                        try {
                                clientAddress = 
ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 
400);
-                       } catch (IOException e) {
-                               throw new RuntimeException("IOException while 
trying to connect to the master", e);
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Could not determine an 
suitable network address to " +
+                                               "receive back data from the 
streaming program.", e);
                        }
                } else {
                        try {
                                clientAddress = InetAddress.getLocalHost();
                        } catch (UnknownHostException e) {
-                               throw new RuntimeException("getLocalHost 
failed", e);
+                               throw new IOException("Could not determine this 
machines own local address to " +
+                                               "receive back data from the 
streaming program.", e);
                        }
                }
 
-               DataStreamSink<OUT> sink = stream.addSink(new 
CollectSink<OUT>(clientAddress, it.getPort(), serializer));
+               DataStreamSink<OUT> sink = stream.addSink(new 
CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
                sink.setParallelism(1); // It would not work if multiple 
instances would connect to the same port
 
-               (new CallExecute(stream)).start();
-
-               return it;
+               (new CallExecute(env, iter)).start();
+               
+               return iter;
        }
 
-       private static class CallExecute<OUT> extends Thread {
+       private static class CallExecute extends Thread {
 
-               DataStream<OUT> stream;
+               private final StreamExecutionEnvironment toTrigger;
+               private final SocketStreamIterator<?> toNotify;
 
-               public CallExecute(DataStream<OUT> stream) {
-                       this.stream = stream;
+               private CallExecute(StreamExecutionEnvironment toTrigger, 
SocketStreamIterator<?> toNotify) {
+                       this.toTrigger = toTrigger;
+                       this.toNotify = toNotify;
                }
 
                @Override
                public void run(){
                        try {
-                               stream.getExecutionEnvironment().execute();
-                       } catch (Exception e) {
-                               throw new RuntimeException("Exception in 
execute()", e);
+                               toTrigger.execute();
+                       }
+                       catch (Throwable t) {
+                               toNotify.notifyOfError(t);
                        }
                }
        }
 
+       // 
------------------------------------------------------------------------
+
        /**
         * Private constructor to prevent instantiation.
         */
-       private DataStreamUtils() {
-               throw new RuntimeException();
-       }
+       private DataStreamUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
new file mode 100644
index 0000000..c65be85
--- /dev/null
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Iterator;
+import java.net.ServerSocket;
+import java.io.IOException;
+import java.io.EOFException;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+
+/**
+ * An iterator that returns the data from a socket stream.
+ * 
+ * <p>The iterator's constructor opens a server socket. In the first call to 
{@link #next()}
+ * or {@link #hasNext()}, the iterator waits for a socket to connect, and 
starts receiving,
+ * deserializing, and returning the data from that socket.
+ * 
+ * @param <T> The type of elements returned from the iterator.
+ */
+class SocketStreamIterator<T> implements Iterator<T> {
+
+       /** Server socket to listen at */
+       private final ServerSocket socket;
+
+       /** Serializer to deserialize stream */
+       private final TypeSerializer<T> serializer;
+
+       /** Set by the same thread that reads it */
+       private DataInputViewStreamWrapper inStream;
+
+       /** Next element, handover from hasNext() to next() */
+       private T next;
+
+       /** The socket for the specific stream */
+       private Socket connectedSocket;
+
+       /** Async error, for example by the executor of the program that 
produces the stream */
+       private volatile Throwable error;
+
+
+       SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
+               this.serializer = serializer;
+               try {
+                       socket = new ServerSocket(0, 1);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Could not open socket to 
receive back stream results");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns the port on which the iterator is getting the data. (Used 
internally.)
+        * @return The port
+        */
+       public int getPort() {
+               return socket.getLocalPort();
+       }
+       
+       public InetAddress getBindAddress() {
+               return socket.getInetAddress();
+       }
+       
+       public void close() {
+               if (connectedSocket != null) {
+                       try {
+                               connectedSocket.close();
+                       } catch (Throwable ignored) {}
+               }
+               
+               try {
+                       socket.close();
+               } catch (Throwable ignored) {}
+       }
+
+       // 
------------------------------------------------------------------------
+       //  iterator semantics
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns true if the DataStream has more elements.
+        * (Note: blocks if there will be more elements, but they are not 
available yet.)
+        * @return true if the DataStream has more elements
+        */
+       @Override
+       public boolean hasNext() {
+               if (next == null) {
+                       try {
+                               next = readNextFromStream();
+                       } catch (Exception e) {
+                               throw new RuntimeException("Failed to receive 
next element: " + e.getMessage(), e);
+                       }
+               }
+               
+               return next != null;
+       }
+
+       /**
+        * Returns the next element of the DataStream. (Blocks if it is not 
available yet.)
+        * @return The element
+        * @throws NoSuchElementException if the stream has already ended
+        */
+       @Override
+       public T next() {
+               if (hasNext()) {
+                       T current = next;
+                       next = null;
+                       return current;
+               } else {
+                       throw new NoSuchElementException();
+               }
+       }
+
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+
+       private T readNextFromStream() throws Exception {
+               try {
+                       if (inStream == null) {
+                               connectedSocket = socket.accept();
+                               inStream = new 
DataInputViewStreamWrapper(connectedSocket.getInputStream());
+                       }
+                       
+                       return serializer.deserialize(inStream);
+               }
+               catch (EOFException e) {
+                       try {
+                               connectedSocket.close();
+                       } catch (Throwable ignored) {}
+                       
+                       try {
+                               socket.close();
+                       } catch (Throwable ignored) {}
+                       
+                       return null;
+               }
+               catch (Exception e) {
+                       if (error == null) {
+                               throw e;
+                       }
+                       else {
+                               // throw the root cause error
+                               throw new Exception("Receiving stream failed: " 
+ error.getMessage(), error);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  errors
+       // 
------------------------------------------------------------------------
+       
+       public void notifyOfError(Throwable error) {
+               if (error != null && this.error == null) {
+                       this.error = error;
+
+                       // this should wake up any blocking calls
+                       try {
+                               connectedSocket.close();
+                       } catch (Throwable ignored) {}
+                       try {
+                               socket.close();
+                       } catch (Throwable ignored) {}
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index fab5c9a..10ea85c 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -18,43 +18,45 @@
 
 package org.apache.flink.contrib.streaming;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.Test;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Assert;
 
 import java.util.Iterator;
 
+import static org.junit.Assert.*;
+
 /**
  * This test verifies the behavior of DataStreamUtils.collect.
  */
 public class CollectITCase {
 
        @Test
-       public void testCollect() {
-
-               Configuration config = new Configuration();
-               ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(config, false);
-               cluster.start();
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-
-               long N = 10;
-               DataStream<Long> stream = env.generateSequence(1, N);
-
-               long i = 1;
-               for(Iterator it = DataStreamUtils.collect(stream); 
it.hasNext(); ) {
-                       Long x = (Long) it.next();
-                       if(x != i) {
-                               Assert.fail(String.format("Should have got %d, 
got %d instead.", i, x));
+       public void testCollect() throws Exception {
+               final ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(new Configuration(), false);
+               try {
+                       cluster.start();
+
+                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+       
+                       final long N = 10;
+                       DataStream<Long> stream = env.generateSequence(1, N);
+       
+                       long i = 1;
+                       for (Iterator<Long> it = 
DataStreamUtils.collect(stream); it.hasNext(); ) {
+                               long x = it.next();
+                               assertEquals("received wrong element", i, x);
+                               i++;
                        }
-                       i++;
+                       
+                       assertEquals("received wrong number of elements", N + 
1, i);
                }
-               if(i != N + 1) {
-                       Assert.fail(String.format("Should have collected %d 
numbers, got %d instead.", N, i - 1));
+               finally {
+                       cluster.stop();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
new file mode 100644
index 0000000..f8739a3
--- /dev/null
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Test;
+
+import java.net.Socket;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class SocketStreamIteratorTest {
+       
+       @Test
+       public void testIterator() throws Exception {
+               
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               
+               final long seed = new Random().nextLong();
+               final int numElements = 1000;
+               
+               final SocketStreamIterator<Long> iterator = new 
SocketStreamIterator<>(LongSerializer.INSTANCE);
+               
+               Thread writer = new Thread() {
+                       
+                       @Override
+                       public void run() {
+                               try {
+                                       try (Socket sock = new 
Socket(iterator.getBindAddress(), iterator.getPort());
+                                               DataOutputViewStreamWrapper out 
= new DataOutputViewStreamWrapper(sock.getOutputStream()))
+                                       {
+                                               final TypeSerializer<Long> 
serializer = LongSerializer.INSTANCE;
+                                               final Random rnd = new 
Random(seed);
+                                               
+                                               for (int i = 0; i < 
numElements; i++) {
+                                                       
serializer.serialize(rnd.nextLong(), out);
+                                               }
+                                       }
+                               }
+                               catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               
+               writer.start();
+               
+               final Random validator = new Random(seed);
+               for (int i = 0; i < numElements; i++) {
+                       assertTrue(iterator.hasNext());
+                       assertTrue(iterator.hasNext());
+                       assertEquals(validator.nextLong(), 
iterator.next().longValue());
+               }
+               
+               assertFalse(iterator.hasNext());
+               writer.join();
+               assertFalse(iterator.hasNext());
+       }
+
+       @Test
+       public void testIteratorWithException() throws Exception {
+
+               final SocketStreamIterator<Long> iterator = new 
SocketStreamIterator<>(LongSerializer.INSTANCE);
+
+               // asynchronously set an error
+               new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       Thread.sleep(100);
+                               } catch (InterruptedException ignored) {}
+                               iterator.notifyOfError(new Exception("test"));
+                       }
+               }.start();
+
+               try {
+                       iterator.hasNext();
+               }
+               catch (Exception e) {
+                       assertTrue(e.getCause().getMessage().contains("test"));
+               }
+       }
+}

Reply via email to