Move Common Fn Execution Concepts to fn-execution Move Stream control packages to fn-execution. Update java package names.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9e2be91 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9e2be91 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9e2be91 Branch: refs/heads/master Commit: f9e2be91836fd7fb25ed772a35d9626a7c5a2cc6 Parents: cbb3a73 Author: Thomas Groh <tg...@google.com> Authored: Thu Nov 9 18:31:26 2017 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Wed Nov 22 16:16:41 2017 -0800 ---------------------------------------------------------------------- runners/java-fn-execution/pom.xml | 29 --- .../beam/runners/fnexecution/ServerFactory.java | 2 +- .../runners/fnexecution/ServerFactoryTest.java | 6 +- sdks/java/fn-execution/build.gradle | 10 +- sdks/java/fn-execution/pom.xml | 49 ++-- .../beam/sdk/fn/stream/AdvancingPhaser.java | 36 +++ .../sdk/fn/stream/BufferingStreamObserver.java | 171 +++++++++++++ .../apache/beam/sdk/fn/stream/DataStreams.java | 249 +++++++++++++++++++ .../sdk/fn/stream/DirectStreamObserver.java | 71 ++++++ .../ForwardingClientResponseObserver.java | 68 +++++ .../apache/beam/sdk/fn/stream/package-info.java | 22 ++ .../org/apache/beam/harness/test/Consumer.java | 26 -- .../org/apache/beam/harness/test/Supplier.java | 26 -- .../apache/beam/harness/test/TestExecutors.java | 93 ------- .../beam/harness/test/TestExecutorsTest.java | 175 ------------- .../apache/beam/harness/test/TestStreams.java | 185 -------------- .../beam/harness/test/TestStreamsTest.java | 109 -------- .../beam/sdk/fn/stream/AdvancingPhaserTest.java | 53 ++++ .../fn/stream/BufferingStreamObserverTest.java | 155 ++++++++++++ .../beam/sdk/fn/stream/DataStreamsTest.java | 167 +++++++++++++ .../sdk/fn/stream/DirectStreamObserverTest.java | 145 +++++++++++ .../ForwardingClientResponseObserverTest.java | 60 +++++ .../org/apache/beam/sdk/fn/test/Consumer.java | 26 ++ .../org/apache/beam/sdk/fn/test/Supplier.java | 26 ++ .../apache/beam/sdk/fn/test/TestExecutors.java | 93 +++++++ .../beam/sdk/fn/test/TestExecutorsTest.java | 175 +++++++++++++ .../apache/beam/sdk/fn/test/TestStreams.java | 185 ++++++++++++++ .../beam/sdk/fn/test/TestStreamsTest.java | 109 ++++++++ .../org/apache/beam/fn/harness/FnHarness.java | 2 +- .../beam/fn/harness/state/BagUserState.java | 2 +- .../beam/fn/harness/stream/AdvancingPhaser.java | 36 --- .../harness/stream/BufferingStreamObserver.java | 166 ------------- .../beam/fn/harness/stream/DataStreams.java | 229 ----------------- .../fn/harness/stream/DirectStreamObserver.java | 71 ------ .../ForwardingClientResponseObserver.java | 63 ----- .../harness/stream/StreamObserverFactory.java | 20 +- .../fn/harness/BeamFnDataReadRunnerTest.java | 4 +- .../apache/beam/fn/harness/FnHarnessTest.java | 4 +- .../control/BeamFnControlClientTest.java | 2 +- .../fn/harness/control/RegisterHandlerTest.java | 4 +- ...BeamFnDataBufferingOutboundObserverTest.java | 2 +- .../harness/data/BeamFnDataGrpcClientTest.java | 4 +- .../data/BeamFnDataGrpcMultiplexerTest.java | 2 +- .../logging/BeamFnLoggingClientTest.java | 2 +- .../state/BeamFnStateGrpcClientCacheTest.java | 2 +- .../fn/harness/stream/AdvancingPhaserTest.java | 48 ---- .../stream/BufferingStreamObserverTest.java | 146 ----------- .../beam/fn/harness/stream/DataStreamsTest.java | 167 ------------- .../stream/DirectStreamObserverTest.java | 139 ----------- .../ForwardingClientResponseObserverTest.java | 60 ----- .../stream/StreamObserverFactoryTest.java | 3 + 51 files changed, 1865 insertions(+), 1834 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/pom.xml ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index 6ff08b7..3ebcfd0 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -32,35 +32,6 @@ <packaging>jar</packaging> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <executions> - <execution> - <id>enforce-banned-dependencies</id> - <goals> - <goal>enforce</goal> - </goals> - <configuration> - <rules> - <bannedDependencies> - <excludes> - <exclude>com.google.guava:guava-jdk5</exclude> - <exclude>com.google.protobuf:protobuf-lite</exclude> - <exclude>org.apache.beam:beam-sdks-java-core</exclude> - </excludes> - </bannedDependencies> - </rules> - <fail>true</fail> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java index 93c787d..bb45d08 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java @@ -28,8 +28,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import org.apache.beam.sdk.fn.channel.SocketAddressFactory; import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.fn.channel.SocketAddressFactory; /** * A {@link Server gRPC server} factory. http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java index e0d7bf9..e9b5fa6 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java @@ -38,13 +38,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; -import org.apache.beam.harness.test.Consumer; -import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +import org.apache.beam.sdk.fn.test.Consumer; +import org.apache.beam.sdk.fn.test.TestStreams; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/build.gradle ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/build.gradle b/sdks/java/fn-execution/build.gradle index 1ffe428..69ec54a 100644 --- a/sdks/java/fn-execution/build.gradle +++ b/sdks/java/fn-execution/build.gradle @@ -21,17 +21,10 @@ applyJavaNature() description = "Apache Beam :: SDKs :: Java :: Fn Execution" -configurations.all { - // Fn Execution contains shared utilities for Runners and Harnesses which use - // the Portability framework. Runner-side interactions must not require a - // dependency on any particular SDK, so this library must not introduce such an - // edge. - exclude group: "org.apache.beam", module: "beam-sdks-java-core" -} - dependencies { compile library.java.guava shadow project(path: ":beam-model-parent:beam-model-pipeline", configuration: "shadow") + shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration: "shadow") shadow library.java.grpc_core shadow library.java.grpc_stub shadow library.java.grpc_netty @@ -39,6 +32,7 @@ dependencies { testCompile library.java.junit testCompile library.java.hamcrest_core testCompile library.java.hamcrest_library + testCompile library.java.mockito_core } task packageTests(type: Jar) { http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml index 3bdec38..773873e 100644 --- a/sdks/java/fn-execution/pom.xml +++ b/sdks/java/fn-execution/pom.xml @@ -34,39 +34,6 @@ <packaging>jar</packaging> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <executions> - <execution> - <id>enforce-banned-dependencies</id> - <goals> - <goal>enforce</goal> - </goals> - <configuration> - <rules> - <bannedDependencies> - <excludes> - <exclude>com.google.guava:guava-jdk5</exclude> - <exclude>com.google.protobuf:protobuf-lite</exclude> - <!-- Fn Execution contains shared utilities for Runners and Harnesses which use - the Portability framework. Runner-side interactions must not require a - dependency on any particular SDK, so this library must not introduce such an - edge. --> - <exclude>org.apache.beam:beam-sdks-java-core</exclude> - </excludes> - </bannedDependencies> - </rules> - <fail>true</fail> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> <dependency> <groupId>org.apache.beam</groupId> @@ -74,6 +41,16 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-core</artifactId> </dependency> @@ -111,5 +88,11 @@ <artifactId>hamcrest-all</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java new file mode 100644 index 0000000..c091705 --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import java.util.concurrent.Phaser; + +/** + * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates + * after the first advancement. + */ +public final class AdvancingPhaser extends Phaser { + public AdvancingPhaser(int numParties) { + super(numParties); + } + + @Override + protected boolean onAdvance(int phase, int registeredParties) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java new file mode 100644 index 0000000..b541e5f --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing + * thread responsible for interacting with the underlying {@link CallStreamObserver}. + * + * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser} + * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers + * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver} + * becomes ready. + */ +@ThreadSafe +public final class BufferingStreamObserver<T> implements StreamObserver<T> { + private static final Object POISON_PILL = new Object(); + private final LinkedBlockingDeque<T> queue; + private final Phaser phaser; + private final CallStreamObserver<T> outboundObserver; + private final Future<?> queueDrainer; + private final int bufferSize; + + public BufferingStreamObserver( + Phaser phaser, + CallStreamObserver<T> outboundObserver, + ExecutorService executor, + int bufferSize) { + this.phaser = phaser; + this.bufferSize = bufferSize; + this.queue = new LinkedBlockingDeque<>(bufferSize); + this.outboundObserver = outboundObserver; + this.queueDrainer = + executor.submit( + new Runnable() { + @Override + public void run() { + drainQueue(); + } + }); + } + + private void drainQueue() { + try { + while (true) { + int currentPhase = phaser.getPhase(); + while (outboundObserver.isReady()) { + T value = queue.take(); + if (value != POISON_PILL) { + outboundObserver.onNext(value); + } else { + return; + } + } + phaser.awaitAdvance(currentPhase); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + @Override + public void onNext(T value) { + try { + // Attempt to add an element to the bounded queue occasionally checking to see + // if the queue drainer is still alive. + while (!queue.offer(value, 60, TimeUnit.SECONDS)) { + checkState(!queueDrainer.isDone(), "Stream observer has finished."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void onError(Throwable t) { + synchronized (outboundObserver) { + // If we are done, then a previous caller has already shutdown the queue processing thread + // hence we don't need to do it again. + if (!queueDrainer.isDone()) { + // We check to see if we were able to successfully insert the poison pill at the front of + // the queue to cancel the processing thread eagerly or if the processing thread is done. + try { + // We shouldn't attempt to insert into the queue if the queue drainer thread is done + // since the queue may be full and nothing will be emptying it. + while (!queueDrainer.isDone() + && !queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + waitTillFinish(); + } + outboundObserver.onError(t); + } + } + + @Override + public void onCompleted() { + synchronized (outboundObserver) { + // If we are done, then a previous caller has already shutdown the queue processing thread + // hence we don't need to do it again. + if (!queueDrainer.isDone()) { + // We check to see if we were able to successfully insert the poison pill at the end of + // the queue forcing the remainder of the elements to be processed or if the processing + // thread is done. + try { + // We shouldn't attempt to insert into the queue if the queue drainer thread is done + // since the queue may be full and nothing will be emptying it. + while (!queueDrainer.isDone() + && !queue.offerLast((T) POISON_PILL, 60, TimeUnit.SECONDS)) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + waitTillFinish(); + } + outboundObserver.onCompleted(); + } + } + + @VisibleForTesting + public int getBufferSize() { + return bufferSize; + } + + private void waitTillFinish() { + try { + queueDrainer.get(); + } catch (CancellationException e) { + // Cancellation is expected + return; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java new file mode 100644 index 0000000..35abc4c --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.stream; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingInputStream; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PushbackInputStream; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import org.apache.beam.sdk.coders.Coder; + +/** + * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and + * {@link #outbound(OutputChunkConsumer)} treats a single {@link OutputStream} as multiple + * {@link ByteString}s. + */ +// TODO: Migrate logic from BeamFnDataBufferingOutboundObserver to support Outbound +public class DataStreams { + /** + * Converts multiple {@link ByteString}s into a single {@link InputStream}. + * + * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until + * either it knows that no more values will be provided or it has the next {@link ByteString}. + */ + public static InputStream inbound(Iterator<ByteString> bytes) { + return new Inbound(bytes); + } + + /** + * Converts a single {@link OutputStream} into multiple {@link ByteString ByteStrings}. + */ + public static OutputStream outbound(OutputChunkConsumer<ByteString> consumer) { + // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver + throw new UnsupportedOperationException(); + } + + /** + * Reads chunks of output. + * + * @deprecated Used as a temporary placeholder until implementation of + * {@link #outbound(OutputChunkConsumer)}. + */ + @Deprecated + public interface OutputChunkConsumer<T> { + void read(T chunk) throws Exception; + } + + /** + * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the + * first {@link Iterator} on first access of this input stream. + * + * <p>Closing this input stream has no effect. + */ + private static class Inbound<T> extends InputStream { + private static final InputStream EMPTY_STREAM = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + + private final Iterator<ByteString> bytes; + private InputStream currentStream; + + public Inbound(Iterator<ByteString> bytes) { + this.currentStream = EMPTY_STREAM; + this.bytes = bytes; + } + + @Override + public int read() throws IOException { + int rval = -1; + // Move on to the next stream if we have read nothing + while ((rval = currentStream.read()) == -1 && bytes.hasNext()) { + currentStream = bytes.next().newInput(); + } + return rval; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int remainingLen = len; + while ((remainingLen -= ByteStreams.read( + currentStream, b, off + len - remainingLen, remainingLen)) > 0) { + if (bytes.hasNext()) { + currentStream = bytes.next().newInput(); + } else { + int bytesRead = len - remainingLen; + return bytesRead > 0 ? bytesRead : -1; + } + } + return len - remainingLen; + } + } + + /** + * An adapter which converts an {@link InputStream} to an {@link Iterator} of {@code T} values + * using the specified {@link Coder}. + * + * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode + * consuming zero bytes to consuming exactly one byte. + * + * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on + * first access to {@link #next()} or {@link #hasNext()}. + */ + public static class DataStreamDecoder<T> implements Iterator<T> { + private enum State { READ_REQUIRED, HAS_NEXT, EOF }; + + private final CountingInputStream countingInputStream; + private final PushbackInputStream pushbackInputStream; + private final Coder<T> coder; + private State currentState; + private T next; + public DataStreamDecoder(Coder<T> coder, InputStream inputStream) { + this.currentState = State.READ_REQUIRED; + this.coder = coder; + this.pushbackInputStream = new PushbackInputStream(inputStream, 1); + this.countingInputStream = new CountingInputStream(pushbackInputStream); + } + + @Override + public boolean hasNext() { + switch (currentState) { + case EOF: + return false; + case READ_REQUIRED: + try { + int nextByte = pushbackInputStream.read(); + if (nextByte == -1) { + currentState = State.EOF; + return false; + } + + pushbackInputStream.unread(nextByte); + long count = countingInputStream.getCount(); + next = coder.decode(countingInputStream); + // Skip one byte if decoding the value consumed 0 bytes. + if (countingInputStream.getCount() - count == 0) { + checkState(countingInputStream.read() != -1, "Unexpected EOF reached"); + } + currentState = State.HAS_NEXT; + } catch (IOException e) { + throw new IllegalStateException(e); + } + return true; + case HAS_NEXT: + return true; + } + throw new IllegalStateException(String.format("Unknown state %s", currentState)); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + currentState = State.READ_REQUIRED; + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + /** + * Allows for one or more writing threads to append values to this iterator while one reading + * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is + * available or this has been closed. + * + * <p>External synchronization must be provided if multiple readers would like to access the + * {@link Iterator#hasNext()} and {@link Iterator#next()} methods. + * + * <p>The order or values which are appended to this iterator is nondeterministic when multiple + * threads call {@link #accept(Object)}. + */ + public static class BlockingQueueIterator<T> + implements AutoCloseable, Iterator<T> { + private static final Object POISION_PILL = new Object(); + private final BlockingQueue<T> queue; + + /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */ + private T currentElement; + + public BlockingQueueIterator(BlockingQueue<T> queue) { + this.queue = queue; + } + + @Override + public void close() throws Exception { + queue.put((T) POISION_PILL); + } + + public void accept(T t) throws Exception { + queue.put(t); + } + + @Override + public boolean hasNext() { + if (currentElement == null) { + try { + currentElement = queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + return currentElement != POISION_PILL; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + T rval = currentElement; + currentElement = null; + return rval; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java new file mode 100644 index 0000000..eb7183f --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.Phaser; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A {@link StreamObserver} which uses synchronization on the underlying + * {@link CallStreamObserver} to provide thread safety. + * + * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser} + * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. + * Creator is expected to advance the {@link Phaser} whenever the underlying + * {@link CallStreamObserver} becomes ready. + */ +@ThreadSafe +public final class DirectStreamObserver<T> implements StreamObserver<T> { + private final Phaser phaser; + private final CallStreamObserver<T> outboundObserver; + + public DirectStreamObserver( + Phaser phaser, + CallStreamObserver<T> outboundObserver) { + this.phaser = phaser; + this.outboundObserver = outboundObserver; + } + + @Override + public void onNext(T value) { + int phase = phaser.getPhase(); + if (!outboundObserver.isReady()) { + phaser.awaitAdvance(phase); + } + synchronized (outboundObserver) { + outboundObserver.onNext(value); + } + } + + @Override + public void onError(Throwable t) { + synchronized (outboundObserver) { + outboundObserver.onError(t); + } + } + + @Override + public void onCompleted() { + synchronized (outboundObserver) { + outboundObserver.onCompleted(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java new file mode 100644 index 0000000..958c69b --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; + +/** + * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls. + * + * <p>Used to wrap existing {@link StreamObserver}s to be able to install an + * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}. + * + * <p>This is as thread-safe as the underlying stream observer that is being wrapped. + */ +public final class ForwardingClientResponseObserver<ReqT, RespT> + implements ClientResponseObserver<RespT, ReqT> { + public static <ReqT, RespT> ForwardingClientResponseObserver<ReqT, RespT> create( + StreamObserver<ReqT> inbound, Runnable onReadyHandler) { + return new ForwardingClientResponseObserver<>(inbound, onReadyHandler); + } + + private final Runnable onReadyHandler; + private final StreamObserver<ReqT> inboundObserver; + + ForwardingClientResponseObserver( + StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) { + this.inboundObserver = inboundObserver; + this.onReadyHandler = onReadyHandler; + } + + @Override + public void onNext(ReqT value) { + inboundObserver.onNext(value); + } + + @Override + public void onError(Throwable t) { + inboundObserver.onError(t); + } + + @Override + public void onCompleted() { + inboundObserver.onCompleted(); + } + + @Override + public void beforeStart(ClientCallStreamObserver<RespT> stream) { + stream.setOnReadyHandler(onReadyHandler); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java new file mode 100644 index 0000000..6aa2729 --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * gRPC stream management. + */ +package org.apache.beam.sdk.fn.stream; http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java deleted file mode 100644 index 279fc29..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.test; - -/** - * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers. - */ -public interface Consumer<T> { - void accept(T item); -} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java deleted file mode 100644 index 629afc2..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.test; - -/** - * A fork of the Java 8 Supplier interface, to enable migrations. - */ -public interface Supplier<T> { - T get(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java deleted file mode 100644 index ca12d5a..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.test; - -import com.google.common.util.concurrent.ForwardingExecutorService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -/** - * A {@link TestRule} that validates that all submitted tasks finished and were completed. This - * allows for testing that tasks have exercised the appropriate shutdown logic. - */ -public class TestExecutors { - public static TestExecutorService from(final ExecutorService staticExecutorService) { - return from(new Supplier<ExecutorService>() { - @Override - public ExecutorService get() { - return staticExecutorService; - } - }); - } - - public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) { - return new FromSupplier(executorServiceSuppler); - } - - /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */ - public interface TestExecutorService extends ExecutorService, TestRule {} - - private static class FromSupplier extends ForwardingExecutorService - implements TestExecutorService { - private final Supplier<ExecutorService> executorServiceSupplier; - private ExecutorService delegate; - - private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) { - this.executorServiceSupplier = executorServiceSupplier; - } - - @Override - public Statement apply(final Statement statement, Description arg1) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - Throwable thrown = null; - delegate = executorServiceSupplier.get(); - try { - statement.evaluate(); - } catch (Throwable t) { - thrown = t; - } - shutdown(); - if (!awaitTermination(5, TimeUnit.SECONDS)) { - shutdownNow(); - IllegalStateException e = - new IllegalStateException("Test executor failed to shutdown cleanly."); - if (thrown != null) { - thrown.addSuppressed(e); - } else { - thrown = e; - } - } - if (thrown != null) { - throw thrown; - } - } - }; - } - - @Override - protected ExecutorService delegate() { - return delegate; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java deleted file mode 100644 index f0c98e0..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.beam.harness.test.TestExecutors.TestExecutorService; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.junit.runners.model.Statement; - -/** Tests for {@link TestExecutors}. */ -@RunWith(JUnit4.class) -public class TestExecutorsTest { - @Test - public void testSuccessfulTermination() throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(service); - final AtomicBoolean taskRan = new AtomicBoolean(); - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - testService.submit(new Runnable() { - @Override - public void run() { - taskRan.set(true); - } - }); - } - }, - null) - .evaluate(); - assertTrue(service.isTerminated()); - assertTrue(taskRan.get()); - } - - @Test - public void testTaskBlocksForeverCausesFailure() throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(service); - final AtomicBoolean taskStarted = new AtomicBoolean(); - final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); - try { - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - testService.submit(new Runnable() { - @Override - public void run() { - taskToRun(); - } - }); - } - - private void taskToRun() { - taskStarted.set(true); - try { - while (true) { - Thread.sleep(10000); - } - } catch (InterruptedException e) { - taskWasInterrupted.set(true); - return; - } - } - }, - null) - .evaluate(); - fail(); - } catch (IllegalStateException e) { - assertEquals(IllegalStateException.class, e.getClass()); - assertEquals("Test executor failed to shutdown cleanly.", e.getMessage()); - } - assertTrue(service.isShutdown()); - } - - @Test - public void testStatementFailurePropagatedCleanly() throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(service); - final RuntimeException exceptionToThrow = new RuntimeException(); - try { - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - throw exceptionToThrow; - } - }, - null) - .evaluate(); - fail(); - } catch (RuntimeException thrownException) { - assertSame(exceptionToThrow, thrownException); - } - assertTrue(service.isShutdown()); - } - - @Test - public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate() - throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(service); - final AtomicBoolean taskStarted = new AtomicBoolean(); - final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); - final RuntimeException exceptionToThrow = new RuntimeException(); - try { - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - testService.submit(new Runnable() { - @Override - public void run() { - taskToRun(); - } - }); - throw exceptionToThrow; - } - - private void taskToRun() { - taskStarted.set(true); - try { - while (true) { - Thread.sleep(10000); - } - } catch (InterruptedException e) { - taskWasInterrupted.set(true); - return; - } - } - }, - null) - .evaluate(); - fail(); - } catch (RuntimeException thrownException) { - assertSame(exceptionToThrow, thrownException); - assertEquals(1, exceptionToThrow.getSuppressed().length); - assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass()); - assertEquals( - "Test executor failed to shutdown cleanly.", - exceptionToThrow.getSuppressed()[0].getMessage()); - } - assertTrue(service.isShutdown()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java deleted file mode 100644 index 3df743a..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.test; - -import io.grpc.stub.CallStreamObserver; -import io.grpc.stub.StreamObserver; - -/** Utility methods which enable testing of {@link StreamObserver}s. */ -public class TestStreams { - /** - * Creates a test {@link CallStreamObserver} {@link Builder} that forwards - * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}. - */ - public static <T> Builder<T> withOnNext(Consumer<T> onNext) { - return new Builder<>(new ForwardingCallStreamObserver<>( - onNext, - TestStreams.<Throwable>noopConsumer(), - TestStreams.noopRunnable(), - TestStreams.alwaysTrueSupplier())); - } - - /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */ - public static class Builder<T> { - private final ForwardingCallStreamObserver<T> observer; - private Builder(ForwardingCallStreamObserver<T> observer) { - this.observer = observer; - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link CallStreamObserver#isReady} callback. - */ - public Builder<T> withIsReady(Supplier<Boolean> isReady) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, - observer.onError, - observer.onCompleted, - isReady)); - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link StreamObserver#onCompleted} callback. - */ - public Builder<T> withOnCompleted(Runnable onCompleted) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, - observer.onError, - onCompleted, - observer.isReady)); - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link StreamObserver#onError} callback. - */ - public Builder<T> withOnError(final Runnable onError) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, - new Consumer<Throwable>() { - @Override - public void accept(Throwable t) { - onError.run(); - } - }, - observer.onCompleted, - observer.isReady)); - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link StreamObserver#onError} consumer. - */ - public Builder<T> withOnError(Consumer<Throwable> onError) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, onError, observer.onCompleted, observer.isReady)); - } - - public CallStreamObserver<T> build() { - return observer; - } - } - - private static void noop() { - } - - private static Runnable noopRunnable() { - return new Runnable() { - @Override - public void run() { - } - }; - } - - private static void noop(Throwable t) { - } - - private static <T> Consumer<T> noopConsumer() { - return new Consumer<T>() { - @Override - public void accept(T item) { - } - }; - } - - private static boolean returnTrue() { - return true; - } - - private static Supplier<Boolean> alwaysTrueSupplier() { - return new Supplier<Boolean>() { - @Override - public Boolean get() { - return true; - } - }; - } - - /** A {@link CallStreamObserver} which executes the supplied callbacks. */ - private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> { - private final Consumer<T> onNext; - private final Supplier<Boolean> isReady; - private final Consumer<Throwable> onError; - private final Runnable onCompleted; - - public ForwardingCallStreamObserver( - Consumer<T> onNext, - Consumer<Throwable> onError, - Runnable onCompleted, - Supplier<Boolean> isReady) { - this.onNext = onNext; - this.onError = onError; - this.onCompleted = onCompleted; - this.isReady = isReady; - } - - @Override - public void onNext(T value) { - onNext.accept(value); - } - - @Override - public void onError(Throwable t) { - onError.accept(t); - } - - @Override - public void onCompleted() { - onCompleted.run(); - } - - @Override - public boolean isReady() { - return isReady.get(); - } - - @Override - public void setOnReadyHandler(Runnable onReadyHandler) {} - - @Override - public void disableAutoInboundFlowControl() {} - - @Override - public void request(int count) {} - - @Override - public void setMessageCompression(boolean enable) {} - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java deleted file mode 100644 index c578397..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link TestStreams}. */ -@RunWith(JUnit4.class) -public class TestStreamsTest { - @Test - public void testOnNextIsCalled() { - final AtomicBoolean onNextWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(new Consumer<Boolean>() { - @Override - public void accept(Boolean item) { - onNextWasCalled.set(item); - } - }).build().onNext(true); - assertTrue(onNextWasCalled.get()); - } - - @Test - public void testIsReadyIsCalled() { - final AtomicBoolean isReadyWasCalled = new AtomicBoolean(); - assertFalse(TestStreams.withOnNext(null) - .withIsReady(new Supplier<Boolean>() { - @Override - public Boolean get() { - return isReadyWasCalled.getAndSet(true); - } - }) - .build() - .isReady()); - assertTrue(isReadyWasCalled.get()); - } - - @Test - public void testOnCompletedIsCalled() { - final AtomicBoolean onCompletedWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(null) - .withOnCompleted(new Runnable() { - @Override - public void run() { - onCompletedWasCalled.set(true); - } - }) - .build() - .onCompleted(); - assertTrue(onCompletedWasCalled.get()); - } - - @Test - public void testOnErrorRunnableIsCalled() { - RuntimeException throwable = new RuntimeException(); - final AtomicBoolean onErrorWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(null) - .withOnError(new Runnable() { - @Override - public void run() { - onErrorWasCalled.set(true); - } - }) - .build() - .onError(throwable); - assertTrue(onErrorWasCalled.get()); - } - - @Test - public void testOnErrorConsumerIsCalled() { - RuntimeException throwable = new RuntimeException(); - final Collection<Throwable> onErrorWasCalled = new ArrayList<>(); - TestStreams.withOnNext(null) - .withOnError(new Consumer<Throwable>() { - @Override - public void accept(Throwable item) { - onErrorWasCalled.add(item); - } - }) - .build() - .onError(throwable); - assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java new file mode 100644 index 0000000..3248ab2 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import static org.hamcrest.collection.IsEmptyCollection.empty; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link AdvancingPhaser}. */ +@RunWith(JUnit4.class) +public class AdvancingPhaserTest { + @Test + public void testAdvancement() throws Exception { + final AdvancingPhaser phaser = new AdvancingPhaser(1); + int currentPhase = phaser.getPhase(); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(new Runnable() { + @Override + public void run() { + phaser.arrive(); + } + }); + phaser.awaitAdvance(currentPhase); + assertFalse(phaser.isTerminated()); + service.shutdown(); + if (!service.awaitTermination(10, TimeUnit.SECONDS)) { + assertThat(service.shutdownNow(), empty()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java new file mode 100644 index 0000000..54d02b8 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.fn.test.Consumer; +import org.apache.beam.sdk.fn.test.Supplier; +import org.apache.beam.sdk.fn.test.TestExecutors; +import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService; +import org.apache.beam.sdk.fn.test.TestStreams; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BufferingStreamObserver}. */ +@RunWith(JUnit4.class) +public class BufferingStreamObserverTest { + @Rule public TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool()); + + @Test + public void testThreadSafety() throws Exception { + final List<String> onNextValues = new ArrayList<>(); + AdvancingPhaser phaser = new AdvancingPhaser(1); + final AtomicBoolean isCriticalSectionShared = new AtomicBoolean(); + final BufferingStreamObserver<String> streamObserver = + new BufferingStreamObserver<>( + phaser, + TestStreams.withOnNext( + new Consumer<String>() { + @Override + public void accept(String t) { + // Use the atomic boolean to detect if multiple threads are in this + // critical section. Any thread that enters purposefully blocks by sleeping + // to increase the contention between threads artificially. + assertFalse(isCriticalSectionShared.getAndSet(true)); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + onNextValues.add(t); + assertTrue(isCriticalSectionShared.getAndSet(false)); + } + }).build(), + executor, + 3); + + List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4"); + List<Callable<String>> tasks = new ArrayList<>(); + for (final String prefix : prefixes) { + tasks.add( + new Callable<String>() { + @Override + public String call() throws Exception { + for (int i = 0; i < 10; i++) { + streamObserver.onNext(prefix + i); + } + return prefix; + } + }); + } + List<Future<String>> results = executor.invokeAll(tasks); + for (Future<String> result : results) { + result.get(); + } + streamObserver.onCompleted(); + + // Check that order was maintained. + int[] prefixesIndex = new int[prefixes.size()]; + assertEquals(50, onNextValues.size()); + for (String onNextValue : onNextValues) { + int prefix = Integer.parseInt(onNextValue.substring(0, 1)); + int suffix = Integer.parseInt(onNextValue.substring(1, 2)); + assertEquals(prefixesIndex[prefix], suffix); + prefixesIndex[prefix] += 1; + } + } + + @Test + public void testIsReadyIsHonored() throws Exception { + AdvancingPhaser phaser = new AdvancingPhaser(1); + final AtomicBoolean elementsAllowed = new AtomicBoolean(); + final BufferingStreamObserver<String> streamObserver = + new BufferingStreamObserver<>( + phaser, + TestStreams.withOnNext( + new Consumer<String>() { + @Override + public void accept(String t) { + assertTrue(elementsAllowed.get()); + } + }) + .withIsReady( + new Supplier<Boolean>() { + @Override + public Boolean get() { + return elementsAllowed.get(); + } + }) + .build(), + executor, + 3); + + // Start all the tasks + List<Future<String>> results = new ArrayList<>(); + for (final String prefix : ImmutableList.of("0", "1", "2", "3", "4")) { + results.add( + executor.submit( + new Callable<String>() { + @Override + public String call() throws Exception { + for (int i = 0; i < 10; i++) { + streamObserver.onNext(prefix + i); + } + return prefix; + } + })); + } + + // Have them wait and then flip that we do allow elements and wake up those awaiting + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + elementsAllowed.set(true); + phaser.arrive(); + + for (Future<String> result : results) { + result.get(); + } + streamObserver.onCompleted(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java new file mode 100644 index 0000000..852b3d0 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.fn.stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assume.assumeTrue; + +import com.google.common.collect.Iterators; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.fn.stream.DataStreams.BlockingQueueIterator; +import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataStreams}. */ +@RunWith(Enclosed.class) +public class DataStreamsTest { + + /** Tests for {@link DataStreams.Inbound}. */ + @RunWith(JUnit4.class) + public static class InboundTest { + private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData"); + private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData"); + + @Test + public void testEmptyRead() throws Exception { + assertEquals(ByteString.EMPTY, read()); + assertEquals(ByteString.EMPTY, read(ByteString.EMPTY)); + assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY)); + } + + @Test + public void testRead() throws Exception { + assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B)); + assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B)); + assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY)); + } + + private static ByteString read(ByteString... bytes) throws IOException { + return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator())); + } + } + + /** Tests for {@link DataStreams.BlockingQueueIterator}. */ + @RunWith(JUnit4.class) + public static class BlockingQueueIteratorTest { + @Test(timeout = 10_000) + public void testBlockingQueueIteratorWithoutBlocking() throws Exception { + BlockingQueueIterator<String> iterator = + new BlockingQueueIterator<>(new ArrayBlockingQueue<String>(3)); + + iterator.accept("A"); + iterator.accept("B"); + iterator.close(); + + assertEquals(Arrays.asList("A", "B"), + Arrays.asList(Iterators.toArray(iterator, String.class))); + } + + @Test(timeout = 10_000) + public void testBlockingQueueIteratorWithBlocking() throws Exception { + // The synchronous queue only allows for one element to transfer at a time and blocks + // the sending/receiving parties until both parties are there. + final BlockingQueueIterator<String> iterator = + new BlockingQueueIterator<>(new SynchronousQueue<String>()); + final SettableFuture<List<String>> valuesFuture = SettableFuture.create(); + Thread appender = new Thread() { + @Override + public void run() { + valuesFuture.set(Arrays.asList(Iterators.toArray(iterator, String.class))); + } + }; + appender.start(); + iterator.accept("A"); + iterator.accept("B"); + iterator.close(); + assertEquals(Arrays.asList("A", "B"), valuesFuture.get()); + appender.join(); + } + } + + /** Tests for {@link DataStreams.DataStreamDecoder}. */ + @RunWith(JUnit4.class) + public static class DataStreamDecoderTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testEmptyInputStream() throws Exception { + testDecoderWith(StringUtf8Coder.of()); + } + + @Test + public void testNonEmptyInputStream() throws Exception { + testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ"); + } + + @Test + public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception { + CountingOutputStream countingOutputStream = + new CountingOutputStream(ByteStreams.nullOutputStream()); + GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream); + assumeTrue(countingOutputStream.getCount() == 0); + + testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE); + } + + private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (T value : expected) { + int size = baos.size(); + coder.encode(value, baos); + // Pad an arbitrary byte when values encode to zero bytes + if (baos.size() - size == 0) { + baos.write(0); + } + } + + Iterator<T> decoder = + new DataStreamDecoder<>(coder, new ByteArrayInputStream(baos.toByteArray())); + + Object[] actual = Iterators.toArray(decoder, Object.class); + assertArrayEquals(expected, actual); + + assertFalse(decoder.hasNext()); + assertFalse(decoder.hasNext()); + + thrown.expect(NoSuchElementException.class); + decoder.next(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java new file mode 100644 index 0000000..d59dfbc --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.fn.test.Consumer; +import org.apache.beam.sdk.fn.test.Supplier; +import org.apache.beam.sdk.fn.test.TestExecutors; +import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService; +import org.apache.beam.sdk.fn.test.TestStreams; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DirectStreamObserver}. */ +@RunWith(JUnit4.class) +public class DirectStreamObserverTest { + @Rule public TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool()); + + @Test + public void testThreadSafety() throws Exception { + final List<String> onNextValues = new ArrayList<>(); + AdvancingPhaser phaser = new AdvancingPhaser(1); + final AtomicBoolean isCriticalSectionShared = new AtomicBoolean(); + final DirectStreamObserver<String> streamObserver = + new DirectStreamObserver<>( + phaser, + TestStreams.withOnNext( + new Consumer<String>() { + @Override + public void accept(String t) { + // Use the atomic boolean to detect if multiple threads are in this + // critical section. Any thread that enters purposefully blocks by sleeping + // to increase the contention between threads artificially. + assertFalse(isCriticalSectionShared.getAndSet(true)); + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + onNextValues.add(t); + assertTrue(isCriticalSectionShared.getAndSet(false)); + } + }).build()); + + List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4"); + List<Callable<String>> tasks = new ArrayList<>(); + for (final String prefix : prefixes) { + tasks.add( + new Callable<String>() { + @Override + public String call() throws Exception { + for (int i = 0; i < 10; i++) { + streamObserver.onNext(prefix + i); + } + return prefix; + } + }); + } + executor.invokeAll(tasks); + streamObserver.onCompleted(); + + // Check that order was maintained. + int[] prefixesIndex = new int[prefixes.size()]; + assertEquals(50, onNextValues.size()); + for (String onNextValue : onNextValues) { + int prefix = Integer.parseInt(onNextValue.substring(0, 1)); + int suffix = Integer.parseInt(onNextValue.substring(1, 2)); + assertEquals(prefixesIndex[prefix], suffix); + prefixesIndex[prefix] += 1; + } + } + + @Test + public void testIsReadyIsHonored() throws Exception { + AdvancingPhaser phaser = new AdvancingPhaser(1); + final AtomicBoolean elementsAllowed = new AtomicBoolean(); + final DirectStreamObserver<String> streamObserver = + new DirectStreamObserver<>( + phaser, + TestStreams.withOnNext( + new Consumer<String>() { + @Override + public void accept(String t) { + assertTrue(elementsAllowed.get()); + } + }).withIsReady(new Supplier<Boolean>() { + @Override + public Boolean get() { + return elementsAllowed.get(); + } + }).build()); + + // Start all the tasks + List<Future<String>> results = new ArrayList<>(); + for (final String prefix : ImmutableList.of("0", "1", "2", "3", "4")) { + results.add( + executor.submit( + new Callable<String>() { + @Override + public String call() throws Exception { + for (int i = 0; i < 10; i++) { + streamObserver.onNext(prefix + i); + } + return prefix; + } + })); + } + + // Have them wait and then flip that we do allow elements and wake up those awaiting + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + elementsAllowed.set(true); + phaser.arrive(); + + for (Future<String> result : results) { + result.get(); + } + streamObserver.onCompleted(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java new file mode 100644 index 0000000..a71841d --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.stream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ForwardingClientResponseObserver}. */ +@RunWith(JUnit4.class) +public class ForwardingClientResponseObserverTest { + @Test + public void testCallsAreForwardedAndOnReadyHandlerBound() { + @SuppressWarnings("unchecked") + StreamObserver<Object> delegateObserver = mock(StreamObserver.class); + @SuppressWarnings("unchecked") + ClientCallStreamObserver<Object> callStreamObserver = + mock(ClientCallStreamObserver.class); + Runnable onReadyHandler = new Runnable() { + @Override + public void run() { + } + }; + ClientResponseObserver<Object, Object> observer = + new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler); + observer.onNext("A"); + verify(delegateObserver).onNext("A"); + Throwable t = new RuntimeException(); + observer.onError(t); + verify(delegateObserver).onError(t); + observer.onCompleted(); + verify(delegateObserver).onCompleted(); + observer.beforeStart(callStreamObserver); + verify(callStreamObserver).setOnReadyHandler(onReadyHandler); + verifyNoMoreInteractions(delegateObserver, callStreamObserver); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java new file mode 100644 index 0000000..184a6e2 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.test; + +/** + * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers. + */ +public interface Consumer<T> { + void accept(T item); +} http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java new file mode 100644 index 0000000..b0bae2e --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.test; + +/** + * A fork of the Java 8 Supplier interface, to enable migrations. + */ +public interface Supplier<T> { + T get(); +}