Repository: flink Updated Branches: refs/heads/master 035f62969 -> da23ee38e
[hotfix] [contrib] Fix robustness of DataStreamUtils stream collecting Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3e3bd52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3e3bd52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3e3bd52 Branch: refs/heads/master Commit: d3e3bd528ed972680baebda05acce0bf39af6798 Parents: 77afe28 Author: Stephan Ewen <se...@apache.org> Authored: Thu May 26 20:23:20 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon May 30 14:41:34 2016 +0200 ---------------------------------------------------------------------- .../flink/contrib/streaming/CollectSink.java | 118 ++++-------- .../contrib/streaming/DataStreamIterator.java | 134 ------------- .../contrib/streaming/DataStreamUtils.java | 65 ++++--- .../contrib/streaming/SocketStreamIterator.java | 190 +++++++++++++++++++ .../flink/contrib/streaming/CollectITCase.java | 48 ++--- .../streaming/SocketStreamIteratorTest.java | 104 ++++++++++ 6 files changed, 398 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java index 53b84ef..161eb16 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java @@ -17,35 +17,30 @@ package org.apache.flink.contrib.streaming; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.net.InetAddress; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SocketClientSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputView; /** * A specialized data sink to be used by DataStreamUtils.collect. */ class CollectSink<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class); - private final InetAddress hostIp; private final int port; private final TypeSerializer<IN> serializer; + private transient Socket client; - private transient DataOutputStream dataOutputStream; - private StreamWriterDataOutputView streamWriter; + private transient OutputStream outputStream; + private transient DataOutputViewStreamWrapper streamWriter; /** * Creates a CollectSink that will send the data to the specified host. @@ -60,57 +55,13 @@ class CollectSink<IN> extends RichSinkFunction<IN> { this.serializer = serializer; } - /** - * Initializes the connection to Socket. - */ - public void initializeConnection() { - OutputStream outputStream; - try { - client = new Socket(hostIp, port); - outputStream = client.getOutputStream(); - streamWriter = new StreamWriterDataOutputView(outputStream); - } catch (IOException e) { - throw new RuntimeException(e); - } - dataOutputStream = new DataOutputStream(outputStream); - } - - /** - * Called when new data arrives to the sink, and forwards it to Socket. - * - * @param value - * The incoming data - */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { try { serializer.serialize(value, streamWriter); - } catch (IOException e) { - if(LOG.isErrorEnabled()){ - LOG.error("Cannot send message to socket server at " + hostIp.toString() + ":" + port, e); - } } - } - - /** - * Closes the connection of the Socket client. - */ - private void closeConnection(){ - try { - dataOutputStream.flush(); - client.close(); - } catch (IOException e) { - throw new RuntimeException("Error while closing connection with socket server at " - + hostIp.toString() + ":" + port, e); - } finally { - if (client != null) { - try { - client.close(); - } catch (IOException e) { - LOG.error("Cannot close connection with socket server at " - + hostIp.toString() + ":" + port, e); - } - } + catch (Exception e) { + throw new IOException("Error sending data back to client (" + hostIp.toString() + ":" + port + ')', e); } } @@ -119,34 +70,47 @@ class CollectSink<IN> extends RichSinkFunction<IN> { * @param parameters Configuration. */ @Override - public void open(Configuration parameters) { - initializeConnection(); + public void open(Configuration parameters) throws Exception { + try { + client = new Socket(hostIp, port); + outputStream = client.getOutputStream(); + streamWriter = new DataOutputViewStreamWrapper(outputStream); + } + catch (IOException e) { + throw new IOException("Cannot connect to the client to send back the stream", e); + } } /** * Closes the connection with the Socket server. */ @Override - public void close() { - closeConnection(); - } - - private static class StreamWriterDataOutputView extends DataOutputStream implements DataOutputView { - - public StreamWriterDataOutputView(OutputStream stream) { - super(stream); - } - - public void skipBytesToWrite(int numBytes) throws IOException { - for (int i = 0; i < numBytes; i++) { - write(0); + public void close() throws Exception { + try { + if (outputStream != null) { + outputStream.flush(); + outputStream.close(); + } + + // first regular attempt to cleanly close. Failing that will escalate + if (client != null) { + client.close(); } } - - public void write(DataInputView source, int numBytes) throws IOException { - byte[] data = new byte[numBytes]; - source.readFully(data); - write(data); + catch (Exception e) { + throw new IOException("Error while closing connection that streams data back to client at " + + hostIp.toString() + ":" + port, e); + } + finally { + // if we failed prior to closing the client, close it + if (client != null) { + try { + client.close(); + } + catch (Throwable t) { + // best effort to close, we do not care about an exception here any more + } + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java deleted file mode 100644 index a98740e..0000000 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming; - -import java.util.Iterator; -import java.net.ServerSocket; -import java.io.InputStream; -import java.io.IOException; -import java.io.EOFException; -import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; -import java.io.DataInputStream; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; - -class DataStreamIterator<T> implements Iterator<T> { - - ServerSocket socket; - InputStream tcpStream; - T next; - private final CountDownLatch connectionAccepted = new CountDownLatch(1); - private volatile StreamReaderDataInputView streamReader; - private final TypeSerializer<T> serializer; - - DataStreamIterator(TypeSerializer serializer) { - this.serializer = serializer; - try { - socket = new ServerSocket(0, 1, null); - } catch (IOException e) { - throw new RuntimeException("DataStreamIterator: an I/O error occurred when opening the socket", e); - } - (new AcceptThread()).start(); - } - - private class AcceptThread extends Thread { - public void run() { - try { - tcpStream = socket.accept().getInputStream(); - streamReader = new StreamReaderDataInputView(tcpStream); - connectionAccepted.countDown(); - } catch (IOException e) { - throw new RuntimeException("DataStreamIterator.AcceptThread failed", e); - } - } - } - - /** - * Returns the port on which the iterator is getting the data. (Used internally.) - * @return The port - */ - public int getPort() { - return socket.getLocalPort(); - } - - /** - * Returns true if the DataStream has more elements. - * (Note: blocks if there will be more elements, but they are not available yet.) - * @return true if the DataStream has more elements - */ - @Override - public boolean hasNext() { - if (next == null) { - readNextFromStream(); - } - return next != null; - } - - /** - * Returns the next element of the DataStream. (Blocks if it is not available yet.) - * @return The element - * @throws NoSuchElementException if the stream has already ended - */ - @Override - public T next() { - if (next == null) { - readNextFromStream(); - if (next == null) { - throw new NoSuchElementException(); - } - } - T current = next; - next = null; - return current; - } - - private void readNextFromStream(){ - try { - connectionAccepted.await(); - } catch (InterruptedException e) { - throw new RuntimeException("The calling thread of DataStreamIterator.readNextFromStream was interrupted."); - } - try { - next = serializer.deserialize(streamReader); - } catch (EOFException e) { - next = null; - } catch (IOException e) { - throw new RuntimeException("DataStreamIterator could not read from deserializedStream", e); - } - } - - private static class StreamReaderDataInputView extends DataInputStream implements DataInputView { - - public StreamReaderDataInputView(InputStream stream) { - super(stream); - } - - public void skipBytesToRead(int numBytes) throws IOException { - while (numBytes > 0) { - int skipped = skipBytes(numBytes); - numBytes -= skipped; - } - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java index 6f4e8d9..d4ef9ee 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java @@ -17,17 +17,18 @@ package org.apache.flink.contrib.streaming; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.Iterator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.runtime.net.ConnectionUtils; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Iterator; public final class DataStreamUtils { @@ -35,59 +36,69 @@ public final class DataStreamUtils { * Returns an iterator to iterate over the elements of the DataStream. * @return The iterator */ - public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) { - TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig()); - DataStreamIterator<OUT> it = new DataStreamIterator<OUT>(serializer); + public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException { + + TypeSerializer<OUT> serializer = stream.getType().createSerializer( + stream.getExecutionEnvironment().getConfig()); + + SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer); //Find out what IP of us should be given to CollectSink, that it will be able to connect to StreamExecutionEnvironment env = stream.getExecutionEnvironment(); InetAddress clientAddress; - if(env instanceof RemoteStreamEnvironment) { + + if (env instanceof RemoteStreamEnvironment) { String host = ((RemoteStreamEnvironment)env).getHost(); int port = ((RemoteStreamEnvironment)env).getPort(); try { clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400); - } catch (IOException e) { - throw new RuntimeException("IOException while trying to connect to the master", e); + } + catch (Exception e) { + throw new IOException("Could not determine an suitable network address to " + + "receive back data from the streaming program.", e); } } else { try { clientAddress = InetAddress.getLocalHost(); } catch (UnknownHostException e) { - throw new RuntimeException("getLocalHost failed", e); + throw new IOException("Could not determine this machines own local address to " + + "receive back data from the streaming program.", e); } } - DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, it.getPort(), serializer)); + DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, iter.getPort(), serializer)); sink.setParallelism(1); // It would not work if multiple instances would connect to the same port - (new CallExecute(stream)).start(); - - return it; + (new CallExecute(env, iter)).start(); + + return iter; } - private static class CallExecute<OUT> extends Thread { + private static class CallExecute extends Thread { - DataStream<OUT> stream; + private final StreamExecutionEnvironment toTrigger; + private final SocketStreamIterator<?> toNotify; - public CallExecute(DataStream<OUT> stream) { - this.stream = stream; + private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) { + this.toTrigger = toTrigger; + this.toNotify = toNotify; } @Override public void run(){ try { - stream.getExecutionEnvironment().execute(); - } catch (Exception e) { - throw new RuntimeException("Exception in execute()", e); + toTrigger.execute(); + } + catch (Throwable t) { + toNotify.notifyOfError(t); } } } + // ------------------------------------------------------------------------ + /** * Private constructor to prevent instantiation. */ - private DataStreamUtils() { - throw new RuntimeException(); - } + private DataStreamUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java new file mode 100644 index 0000000..c65be85 --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import java.net.InetAddress; +import java.net.Socket; +import java.util.Iterator; +import java.net.ServerSocket; +import java.io.IOException; +import java.io.EOFException; +import java.util.NoSuchElementException; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; + +/** + * An iterator that returns the data from a socket stream. + * + * <p>The iterator's constructor opens a server socket. In the first call to {@link #next()} + * or {@link #hasNext()}, the iterator waits for a socket to connect, and starts receiving, + * deserializing, and returning the data from that socket. + * + * @param <T> The type of elements returned from the iterator. + */ +class SocketStreamIterator<T> implements Iterator<T> { + + /** Server socket to listen at */ + private final ServerSocket socket; + + /** Serializer to deserialize stream */ + private final TypeSerializer<T> serializer; + + /** Set by the same thread that reads it */ + private DataInputViewStreamWrapper inStream; + + /** Next element, handover from hasNext() to next() */ + private T next; + + /** The socket for the specific stream */ + private Socket connectedSocket; + + /** Async error, for example by the executor of the program that produces the stream */ + private volatile Throwable error; + + + SocketStreamIterator(TypeSerializer<T> serializer) throws IOException { + this.serializer = serializer; + try { + socket = new ServerSocket(0, 1); + } + catch (IOException e) { + throw new RuntimeException("Could not open socket to receive back stream results"); + } + } + + // ------------------------------------------------------------------------ + // properties + // ------------------------------------------------------------------------ + + /** + * Returns the port on which the iterator is getting the data. (Used internally.) + * @return The port + */ + public int getPort() { + return socket.getLocalPort(); + } + + public InetAddress getBindAddress() { + return socket.getInetAddress(); + } + + public void close() { + if (connectedSocket != null) { + try { + connectedSocket.close(); + } catch (Throwable ignored) {} + } + + try { + socket.close(); + } catch (Throwable ignored) {} + } + + // ------------------------------------------------------------------------ + // iterator semantics + // ------------------------------------------------------------------------ + + /** + * Returns true if the DataStream has more elements. + * (Note: blocks if there will be more elements, but they are not available yet.) + * @return true if the DataStream has more elements + */ + @Override + public boolean hasNext() { + if (next == null) { + try { + next = readNextFromStream(); + } catch (Exception e) { + throw new RuntimeException("Failed to receive next element: " + e.getMessage(), e); + } + } + + return next != null; + } + + /** + * Returns the next element of the DataStream. (Blocks if it is not available yet.) + * @return The element + * @throws NoSuchElementException if the stream has already ended + */ + @Override + public T next() { + if (hasNext()) { + T current = next; + next = null; + return current; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private T readNextFromStream() throws Exception { + try { + if (inStream == null) { + connectedSocket = socket.accept(); + inStream = new DataInputViewStreamWrapper(connectedSocket.getInputStream()); + } + + return serializer.deserialize(inStream); + } + catch (EOFException e) { + try { + connectedSocket.close(); + } catch (Throwable ignored) {} + + try { + socket.close(); + } catch (Throwable ignored) {} + + return null; + } + catch (Exception e) { + if (error == null) { + throw e; + } + else { + // throw the root cause error + throw new Exception("Receiving stream failed: " + error.getMessage(), error); + } + } + } + + // ------------------------------------------------------------------------ + // errors + // ------------------------------------------------------------------------ + + public void notifyOfError(Throwable error) { + if (error != null && this.error == null) { + this.error = error; + + // this should wake up any blocking calls + try { + connectedSocket.close(); + } catch (Throwable ignored) {} + try { + socket.close(); + } catch (Throwable ignored) {} + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java index fab5c9a..10ea85c 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java @@ -18,43 +18,45 @@ package org.apache.flink.contrib.streaming; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.ForkableFlinkMiniCluster; + import org.junit.Test; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.junit.Assert; import java.util.Iterator; +import static org.junit.Assert.*; + /** * This test verifies the behavior of DataStreamUtils.collect. */ public class CollectITCase { @Test - public void testCollect() { - - Configuration config = new Configuration(); - ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false); - cluster.start(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - long N = 10; - DataStream<Long> stream = env.generateSequence(1, N); - - long i = 1; - for(Iterator it = DataStreamUtils.collect(stream); it.hasNext(); ) { - Long x = (Long) it.next(); - if(x != i) { - Assert.fail(String.format("Should have got %d, got %d instead.", i, x)); + public void testCollect() throws Exception { + final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(new Configuration(), false); + try { + cluster.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + final long N = 10; + DataStream<Long> stream = env.generateSequence(1, N); + + long i = 1; + for (Iterator<Long> it = DataStreamUtils.collect(stream); it.hasNext(); ) { + long x = it.next(); + assertEquals("received wrong element", i, x); + i++; } - i++; + + assertEquals("received wrong number of elements", N + 1, i); } - if(i != N + 1) { - Assert.fail(String.format("Should have collected %d numbers, got %d instead.", N, i - 1)); + finally { + cluster.stop(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d3e3bd52/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java new file mode 100644 index 0000000..f8739a3 --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Test; + +import java.net.Socket; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class SocketStreamIteratorTest { + + @Test + public void testIterator() throws Exception { + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final long seed = new Random().nextLong(); + final int numElements = 1000; + + final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE); + + Thread writer = new Thread() { + + @Override + public void run() { + try { + try (Socket sock = new Socket(iterator.getBindAddress(), iterator.getPort()); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) + { + final TypeSerializer<Long> serializer = LongSerializer.INSTANCE; + final Random rnd = new Random(seed); + + for (int i = 0; i < numElements; i++) { + serializer.serialize(rnd.nextLong(), out); + } + } + } + catch (Throwable t) { + error.set(t); + } + } + }; + + writer.start(); + + final Random validator = new Random(seed); + for (int i = 0; i < numElements; i++) { + assertTrue(iterator.hasNext()); + assertTrue(iterator.hasNext()); + assertEquals(validator.nextLong(), iterator.next().longValue()); + } + + assertFalse(iterator.hasNext()); + writer.join(); + assertFalse(iterator.hasNext()); + } + + @Test + public void testIteratorWithException() throws Exception { + + final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE); + + // asynchronously set an error + new Thread() { + @Override + public void run() { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) {} + iterator.notifyOfError(new Exception("test")); + } + }.start(); + + try { + iterator.hasNext(); + } + catch (Exception e) { + assertTrue(e.getCause().getMessage().contains("test")); + } + } +}