Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle merged PR #32774: URL: https://github.com/apache/beam/pull/32774 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844393236 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -162,24 +170,23 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - markClosedOrThrow(); - outboundObserver.onError(t); + if (!isClosed) { Review Comment: done also caught the illegal state exception in `AbstractWindmillStream#halfClose` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844329618 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java: ## @@ -132,6 +132,10 @@ private static Optional tryParseDirectEndpointIntoIpV6Address( directEndpointAddress.getHostAddress(), (int) endpointProto.getPort())); } + public final boolean isEmpty() { +return equals(none()); Review Comment: removed but made none a singleton -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844356091 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ## @@ -299,24 +303,39 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi } /** Close the streams that are no longer valid asynchronously. */ - private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) { + @CanIgnoreReturnValue + private ImmutableList> closeStreamsNotIn( + WindmillEndpoints newWindmillEndpoints) { StreamingEngineBackends currentBackends = backends.get(); -currentBackends.windmillStreams().entrySet().stream() -.filter( -connectionAndStream -> - !newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey())) -.forEach( -entry -> -windmillStreamManager.execute( -() -> closeStreamSender(entry.getKey(), entry.getValue(; +List> closeStreamFutures = +currentBackends.windmillStreams().entrySet().stream() +.filter( +connectionAndStream -> +!newWindmillEndpoints +.windmillEndpoints() +.contains(connectionAndStream.getKey())) +.map( +entry -> +CompletableFuture.runAsync( +() -> closeStreamSender(entry.getKey(), entry.getValue()), +windmillStreamManager)) +.collect(Collectors.toList()); Set newGlobalDataEndpoints = new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values()); -currentBackends.globalDataStreams().values().stream() -.filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint())) -.forEach( -sender -> -windmillStreamManager.execute(() -> closeStreamSender(sender.endpoint(), sender))); +List> closeGlobalDataStreamFutures = +currentBackends.globalDataStreams().values().stream() +.filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint())) +.map( +sender -> +CompletableFuture.runAsync( +() -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager)) +.collect(Collectors.toList()); + +return ImmutableList.>builder() Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844328828 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -104,6 +104,10 @@ synchronized void poison() { } } + synchronized boolean hasReceivedPoisonPill() { Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844337008 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -89,6 +90,9 @@ public void onNext(T value) throws StreamObserverCancelledException { throw new StreamObserverCancelledException("StreamObserver was terminated."); } + // We close under "lock", so this should never happen. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844336841 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce // If the delegate above was already terminated via onError or onComplete from another // thread. logger.warn("StreamObserver was previously cancelled.", e); + } catch (RuntimeException ignored) { +logger.warn("StreamObserver was unexpectedly cancelled.", e); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844328452 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.InOrder; + +@RunWith(JUnit4.class) +public class GrpcCommitWorkStreamTest { + private static final String FAKE_SERVER_NAME = "Fake server for GrpcCommitWorkStreamTest"; + private static final Windmill.JobHeader TEST_JOB_HEADER = + Windmill.JobHeader.newBuilder() + .setJobId("test_job") + .setWorkerId("test_worker") + .setProjectId("test_project") + .build(); + private static final String COMPUTATION_ID = "computationId"; + + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); + private ManagedChannel inProcessChannel; + + private static Windmill.WorkItemCommitRequest workItemCommitRequest(long value) { +return Windmill.WorkItemCommitRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setShardingKey(value) +.setWorkToken(value) +.setCacheToken(value) +.build(); + } + + @Before + public void setUp() throws IOException { +Server server = +InProcessServerBuilder.forName(FAKE_SERVER_NAME) +.fallbackHandlerRegistry(serviceRegistry) +.directExecutor() +.build() +.start(); + +inProcessChannel = +grpcCleanup.register( + InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build()); +grpcCleanup.register(server); +grpcCleanup.register(inProcessChannel); + } + + @After + public void cleanUp() { +inProcessChannel.shutdownNow(); + } + + private GrpcCommitWorkStream createCommitWorkStream(CommitWorkStreamTestStub testStub) { +serviceRegistry.addService(testStub); +GrpcCommitWorkStream commitWorkStream = +(GrpcCommitWorkStream) +GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) +.build() +.createCommitWorkStream( +Clo
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844326533 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte @Override public final Instant startTime() { -return new Instant(startTimeMs.get()); +return new Instant(debugMetrics.getStartTimeMs()); } @Override public String backendWorkerToken() { return backendWorkerToken; } + @SuppressWarnings("GuardedBy") @Override - public void shutdown() { -if (isShutdown.compareAndSet(false, true)) { - requestObserver() - .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + public final void shutdown() { +// Don't lock on "this" before poisoning the request observer since otherwise the observer may +// be blocking in send(). +requestObserver.poison(); +synchronized (this) { + if (!isShutdown) { +isShutdown = true; +debugMetrics.recordShutdown(); +shutdownInternal(); + } } } - private void setLastError(String error) { -lastError.set(error); -lastErrorTime.set(DateTime.now()); - } - - public static class WindmillStreamShutdownException extends RuntimeException { -public WindmillStreamShutdownException(String message) { - super(message); + protected abstract void shutdownInternal(); + + /** Returns true if the stream was torn down and should not be restarted internally. */ + private synchronized boolean maybeTearDownStream() { +if (requestObserver.hasReceivedPoisonPill() Review Comment: done, realized we weren't calling `break` on the `WindmillStreamShutdownException` in `startStream()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1844325871 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte @Override public final Instant startTime() { -return new Instant(startTimeMs.get()); +return new Instant(debugMetrics.getStartTimeMs()); } @Override public String backendWorkerToken() { return backendWorkerToken; } + @SuppressWarnings("GuardedBy") Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1843484534 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte @Override public final Instant startTime() { -return new Instant(startTimeMs.get()); +return new Instant(debugMetrics.getStartTimeMs()); } @Override public String backendWorkerToken() { return backendWorkerToken; } + @SuppressWarnings("GuardedBy") @Override - public void shutdown() { -if (isShutdown.compareAndSet(false, true)) { - requestObserver() - .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + public final void shutdown() { +// Don't lock on "this" before poisoning the request observer since otherwise the observer may +// be blocking in send(). +requestObserver.poison(); +synchronized (this) { + if (!isShutdown) { +isShutdown = true; +debugMetrics.recordShutdown(); +shutdownInternal(); + } } } - private void setLastError(String error) { -lastError.set(error); -lastErrorTime.set(DateTime.now()); - } - - public static class WindmillStreamShutdownException extends RuntimeException { -public WindmillStreamShutdownException(String message) { - super(message); + protected abstract void shutdownInternal(); + + /** Returns true if the stream was torn down and should not be restarted internally. */ + private synchronized boolean maybeTearDownStream() { +if (requestObserver.hasReceivedPoisonPill() Review Comment: what is this receivedPoisonPill check guarding against? It is racy because we poison outside of the synchronized block so we could have T1: notices unrelated stream failure, passes this check and isn't poisoned, starts calling onNewStream T2: calls shutdown, poisons request observer T1: calls requestObserver.reset() gets exception due to poison. Instead of checking the poison here, it seems like we should just handle the exception due to reset failing as that covers both cases. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte @Override public final Instant startTime() { -return new Instant(startTimeMs.get()); +return new Instant(debugMetrics.getStartTimeMs()); } @Override public String backendWorkerToken() { return backendWorkerToken; } + @SuppressWarnings("GuardedBy") Review Comment: remove suppression ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1843416690 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java: ## @@ -132,6 +132,10 @@ private static Optional tryParseDirectEndpointIntoIpV6Address( directEndpointAddress.getHostAddress(), (int) endpointProto.getPort())); } + public final boolean isEmpty() { +return equals(none()); Review Comment: How about changing none() to return some singleton instead of building every time if we might be calling empty a lot. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -104,6 +104,10 @@ synchronized void poison() { } } + synchronized boolean hasReceivedPoisonPill() { Review Comment: nit: how about isPoisoned? Poison pill was a special element we added to queues etc, it's a little confusing to use the term here to me. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -162,24 +170,23 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - markClosedOrThrow(); - outboundObserver.onError(t); + if (!isClosed) { Review Comment: I think we should have isUserClosed = onError or onCompleted called on this object isClosed = onError or onCompleted called on outboundObserver Could name isClosed and isOutboundClosed or something if clearer. Woudl be good to comment above too. I think that onError/onCompleted should be like: check(!isUserClosed); isUserClosed = true; if (!isClosed) { outboundObserver.onError(t); isClosed = true; } enforcing that only one of them is called. And terminate should be like if (!isClosed) { outboundObserver.onError(t); isClosed = true; } ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce // If the delegate above was already terminated via onError or onComplete from another // thread. logger.warn("StreamObserver was previously cancelled.", e); + } catch (RuntimeException ignored) { +logger.warn("StreamObserver was unexpectedly cancelled.", e); Review Comment: I was thinking above you should log both e and the currently ignored exception ditto here. Also this log message is a little confusing how about "encountered error {} when cancelling due to error" ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ## @@ -299,24 +303,39 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi } /** Close the streams that are no longer valid asynchronously. */ - private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) { + @CanIgnoreReturnValue + private ImmutableList> closeStreamsNotIn( + WindmillEndpoints newWindmillEndpoints) { StreamingEngineBackends currentBackends = backends.get(); -currentBackends.windmillStreams().entrySet().stream() -.filter( -connectionAndStream -> - !newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey())) -.forEach( -entry -> -windmillStreamManager.execute( -() -> closeStreamSender(entry.getKey(), entry.getValue(; +List> closeStreamFutures = +currentBackends.windmillStreams().entrySet().stream() +.filter( +connectionAndStream -> +!newWindmillEndpoints +.windmillEndpoints() +.contains(connectionAndStream.getKey())) +.map( +entry -> +CompletableFuture.runAsync( +() -> closeStreamSender(entry.getKey(), entry.getValue()), +windmillStreamManager)) +.collect(Collectors.toList()); Set newGlobalDataEndpoints = new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values()); -currentBackends.globalDataStreams().values().stream() -.filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint())) -.forEach( -sender -> -windmillStreamManager.execute(() -> closeStreamSender(sender.endpoint(), sender))); +Lis
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on PR #32774: URL: https://github.com/apache/beam/pull/32774#issuecomment-2478169104 back to you @scwhittle thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841150008 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserverTest.java: ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.fn.stream.AdvancingPhaser; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.VerifyException; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DirectStreamObserverTest { + + @Test Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841042910 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -83,16 +85,14 @@ public void onNext(T value) throws StreamObserverCancelledException { // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted // are synchronized after terminating the phaser if we observe that the phaser is not // terminated the onNext calls below are guaranteed to not be called on a closed observer. - if (currentPhase < 0) return; + if (currentPhase < 0) { +throw new StreamObserverCancelledException("StreamObserver was terminated."); + } Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841039667 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -128,7 +128,9 @@ public void onNext(T value) throws StreamObserverCancelledException { // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted // are synchronized after terminating the phaser if we observe that the phaser is not // terminated the onNext calls below are guaranteed to not be called on a closed observer. - if (currentPhase < 0) return; + if (currentPhase < 0) { +throw new StreamObserverCancelledException("StreamObserver was terminated."); + } Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841038480 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -128,7 +128,9 @@ public void onNext(T value) throws StreamObserverCancelledException { // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted // are synchronized after terminating the phaser if we observe that the phaser is not // terminated the onNext calls below are guaranteed to not be called on a closed observer. - if (currentPhase < 0) return; + if (currentPhase < 0) { +throw new StreamObserverCancelledException("StreamObserver was terminated."); + } messagesSinceReady = 0; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841025802 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -161,19 +162,27 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onError(t); } } @Override public void onCompleted() { +isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onCompleted(); } } + private void markClosedOrThrow() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841019493 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java: ## @@ -102,6 +105,48 @@ private GrpcGetDataStream createGetDataStream(GetDataStreamTestStub testStub) { return getDataStream; } + @Test + public void testRequestKeyedData() { +GetDataStreamTestStub testStub = +new GetDataStreamTestStub(new TestGetDataStreamRequestObserver()); +GrpcGetDataStream getDataStream = createGetDataStream(testStub); +// These will block until they are successfully sent. +CompletableFuture sendFuture = +CompletableFuture.supplyAsync( +() -> { + try { +return getDataStream.requestKeyedData( +"computationId", +Windmill.KeyedGetDataRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setShardingKey(1) +.setCacheToken(1) +.setWorkToken(1) +.build()); + } catch (WindmillStreamShutdownException e) { +throw new RuntimeException(e); + } +}); + +// Sleep a bit to allow future to run. +Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841019232 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java: ## @@ -142,7 +144,8 @@ public void testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn assertThrows( WindmillStreamShutdownException.class, queuedBatch::waitForSendOrFailNotification)); - +// Wait a few seconds for the above future to get scheduled and run. +Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841018880 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ## @@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws InterruptedException { commitProcessed.countDown(); }); } +} catch (StreamObserverCancelledException ignored) { } // Verify that we sent the commits above in a request + the initial header. -verify(requestObserver, times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class)); +verify(requestObserver, times(2)) +.onNext( +argThat( +request -> { + if (request.getHeader().equals(TEST_JOB_HEADER)) { Review Comment: done used inorder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841011999 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -161,19 +162,27 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onError(t); } } @Override public void onCompleted() { +isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onCompleted(); } } + private void markClosedOrThrow() { +synchronized (lock) { + Preconditions.checkState(!isClosed); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841012413 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java: ## @@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {} () -> assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1))); testStream.shutdown(); + +// Sleep a bit to give sendExecutor time to execute the send(). +Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1841002762 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) { for (int i = 0; i < chunk.getRequestIdCount(); ++i) { AppendableInputStream responseStream = pending.get(chunk.getRequestId(i)); - verify(responseStream != null, "No pending response stream"); + synchronized (this) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1840996524 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce try { delegate.onError(e); - } catch (RuntimeException ignored) { + } catch (IllegalStateException ignored) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1840157947 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce try { delegate.onError(e); - } catch (RuntimeException ignored) { + } catch (IllegalStateException ignored) { Review Comment: add the ignored to the exception too? just in case we catch something unexpected and it helps debugging? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -161,19 +162,27 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onError(t); } } @Override public void onCompleted() { +isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onCompleted(); } } + private void markClosedOrThrow() { +synchronized (lock) { + Preconditions.checkState(!isClosed); Review Comment: this is going to throw if we have sequence T1: onNext() T2: terminate() T1: onError() It seems like that could happen if we're terminating from other threads than the one driving the observer generally. We could have a separate bool tracking if userClosed or not, and change this exception to be based upon that as that is using the class wrong. having a terminate before/during a onCompleted/onError doesn't necessarily seem like misuse and I think we should avoid throwing an exception. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -161,19 +162,27 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onError(t); } } @Override public void onCompleted() { +isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onCompleted(); } } + private void markClosedOrThrow() { Review Comment: just make method synchronized (though with suggestion below, probably easier to just duplicate in onCompleted/onError and use the same if block. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) { for (int i = 0; i < chunk.getRequestIdCount(); ++i) { AppendableInputStream responseStream = pending.get(chunk.getRequestId(i)); - verify(responseStream != null, "No pending response stream"); + synchronized (this) { Review Comment: avoid synchronizing if not necessary and you need to handle the null case or you will just get an exception in following line. if (responseStream == null) { sychronized (this) { verify(isShutdown); } continue; } ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ## @@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws InterruptedException { commitProcessed.countDown(); }); } +} catch (StreamObserverCancelledException ignored) { } // Verify that we sent the commits above in a request + the initial header. -verify(requestObserver, times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class)); +verify(requestObserver, times(2)) +.onNext( +argThat( +request -> { + if (request.getHeader().equals(TEST_JOB_HEADER)) { Review Comment: nit: how about separate onNext? as is 2 headers would pass. may need some sequence object, not sure. .onNext(argThat(r -> return r.getHeader().equals(TEST_JOB_HEADER)) .onNext(argThat(r - > !return request.getCommitChunkList().empty()) ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -128,7 +128,9 @@ public void onNext(T value) throws StreamObserverCancelledException { // Phaser is terminated so don't us
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839829444 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.InOrder; + +@RunWith(JUnit4.class) +public class GrpcCommitWorkStreamTest { + private static final String FAKE_SERVER_NAME = "Fake server for GrpcCommitWorkStreamTest"; + private static final Windmill.JobHeader TEST_JOB_HEADER = + Windmill.JobHeader.newBuilder() + .setJobId("test_job") + .setWorkerId("test_worker") + .setProjectId("test_project") + .build(); + private static final String COMPUTATION_ID = "computationId"; + + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); + private ManagedChannel inProcessChannel; + + private static Windmill.WorkItemCommitRequest workItemCommitRequest(long value) { +return Windmill.WorkItemCommitRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setShardingKey(value) +.setWorkToken(value) +.setCacheToken(value) +.build(); + } + + @Before + public void setUp() throws IOException { +Server server = +InProcessServerBuilder.forName(FAKE_SERVER_NAME) +.fallbackHandlerRegistry(serviceRegistry) +.directExecutor() +.build() +.start(); + +inProcessChannel = +grpcCleanup.register( + InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build()); +grpcCleanup.register(server); +grpcCleanup.register(inProcessChannel); + } + + @After + public void cleanUp() { +inProcessChannel.shutdownNow(); + } + + private GrpcCommitWorkStream createCommitWorkStream(CommitWorkStreamTestStub testStub) { +serviceRegistry.addService(testStub); +GrpcCommitWorkStream commitWorkStream = +(GrpcCommitWorkStream) +GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) +.build() +.createCommitWorkStream( +CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel), +new
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839908734 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -342,62 +401,86 @@ private void queueRequestAndWait(QueuedRequest request) throws InterruptedExcept batch.addRequest(request); } if (responsibleForSend) { - if (waitForSendLatch == null) { + if (prevBatch == null) { // If there was not a previous batch wait a little while to improve // batching. -Thread.sleep(1); +sleeper.sleep(1); } else { -waitForSendLatch.await(); +prevBatch.waitForSendOrFailNotification(); } // Finalize the batch so that no additional requests will be added. Leave the batch in the // queue so that a subsequent batch will wait for its completion. - synchronized (batches) { -verify(batch == batches.peekFirst()); + synchronized (this) { +if (isShutdown) { + throw shutdownExceptionFor(batch); +} + +verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send()."); batch.markFinalized(); } - sendBatch(batch.requests()); - synchronized (batches) { -verify(batch == batches.pollFirst()); + trySendBatch(batch); +} else { + // Wait for this batch to be sent before parsing the response. + batch.waitForSendOrFailNotification(); +} + } + + void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException { +try { + sendBatch(batch); + synchronized (this) { +if (isShutdown) { + throw shutdownExceptionFor(batch); +} + +verify( +batch == batches.pollFirst(), +"Sent GetDataStream request batch removed before send() was complete."); } // Notify all waiters with requests in this batch as well as the sender // of the next batch (if one exists). - batch.countDown(); -} else { - // Wait for this batch to be sent before parsing the response. - batch.await(); + batch.notifySent(); +} catch (Exception e) { + // Free waiters if the send() failed. + batch.notifyFailed(); + // Propagate the exception to the calling thread. + throw e; } } - @SuppressWarnings("NullableProblems") - private void sendBatch(List requests) { -StreamingGetDataRequest batchedRequest = flushToBatch(requests); + private void sendBatch(QueuedBatch batch) throws WindmillStreamShutdownException { +if (batch.isEmpty()) { + return; +} + +// Synchronization of pending inserts is necessary with send to ensure duplicates are not +// sent on stream reconnect. synchronized (this) { - // Synchronization of pending inserts is necessary with send to ensure duplicates are not - // sent on stream reconnect. - for (QueuedRequest request : requests) { + if (isShutdown) { +throw shutdownExceptionFor(batch); + } + + for (QueuedRequest request : batch.requestsReadOnly()) { // Map#put returns null if there was no previous mapping for the key, meaning we have not // seen it before. -verify(pending.put(request.id(), request.getResponseStream()) == null); +verify( +pending.put(request.id(), request.getResponseStream()) == null, +"Request already sent."); } - try { -send(batchedRequest); - } catch (IllegalStateException e) { + + if (!trySend(batch.asGetDataRequest())) { // The stream broke before this call went through; onNewStream will retry the fetch. -LOG.warn("GetData stream broke before call started.", e); +LOG.warn("GetData stream broke before call started."); } } } - @SuppressWarnings("argument") - private StreamingGetDataRequest flushToBatch(List requests) { -// Put all global data requests first because there is only a single repeated field for -// request ids and the initial ids correspond to global data requests if they are present. -requests.sort(QueuedRequest.globalRequestsFirst()); -StreamingGetDataRequest.Builder builder = StreamingGetDataRequest.newBuilder(); -for (QueuedRequest request : requests) { - request.addToStreamingGetDataRequest(builder); -} -return builder.build(); + private synchronized void verify(boolean condition, String message) { +Verify.verify(condition || isShutdown, message); + } + + private synchronized boolean isShutdownLocked() { Review Comment: dpne -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this servic
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839912127 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -342,62 +401,86 @@ private void queueRequestAndWait(QueuedRequest request) throws InterruptedExcept batch.addRequest(request); } if (responsibleForSend) { - if (waitForSendLatch == null) { + if (prevBatch == null) { // If there was not a previous batch wait a little while to improve // batching. -Thread.sleep(1); +sleeper.sleep(1); } else { -waitForSendLatch.await(); +prevBatch.waitForSendOrFailNotification(); } // Finalize the batch so that no additional requests will be added. Leave the batch in the // queue so that a subsequent batch will wait for its completion. - synchronized (batches) { -verify(batch == batches.peekFirst()); + synchronized (this) { +if (isShutdown) { + throw shutdownExceptionFor(batch); +} + +verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send()."); batch.markFinalized(); } - sendBatch(batch.requests()); - synchronized (batches) { -verify(batch == batches.pollFirst()); + trySendBatch(batch); +} else { + // Wait for this batch to be sent before parsing the response. + batch.waitForSendOrFailNotification(); +} + } + + void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException { +try { + sendBatch(batch); + synchronized (this) { +if (isShutdown) { + throw shutdownExceptionFor(batch); +} + +verify( +batch == batches.pollFirst(), +"Sent GetDataStream request batch removed before send() was complete."); } // Notify all waiters with requests in this batch as well as the sender // of the next batch (if one exists). - batch.countDown(); -} else { - // Wait for this batch to be sent before parsing the response. - batch.await(); + batch.notifySent(); +} catch (Exception e) { + // Free waiters if the send() failed. + batch.notifyFailed(); + // Propagate the exception to the calling thread. + throw e; } } - @SuppressWarnings("NullableProblems") - private void sendBatch(List requests) { -StreamingGetDataRequest batchedRequest = flushToBatch(requests); + private void sendBatch(QueuedBatch batch) throws WindmillStreamShutdownException { +if (batch.isEmpty()) { + return; +} + +// Synchronization of pending inserts is necessary with send to ensure duplicates are not +// sent on stream reconnect. synchronized (this) { - // Synchronization of pending inserts is necessary with send to ensure duplicates are not - // sent on stream reconnect. - for (QueuedRequest request : requests) { + if (isShutdown) { +throw shutdownExceptionFor(batch); + } + + for (QueuedRequest request : batch.requestsReadOnly()) { // Map#put returns null if there was no previous mapping for the key, meaning we have not // seen it before. -verify(pending.put(request.id(), request.getResponseStream()) == null); +verify( +pending.put(request.id(), request.getResponseStream()) == null, +"Request already sent."); } - try { -send(batchedRequest); - } catch (IllegalStateException e) { + + if (!trySend(batch.asGetDataRequest())) { // The stream broke before this call went through; onNewStream will retry the fetch. -LOG.warn("GetData stream broke before call started.", e); +LOG.warn("GetData stream broke before call started."); } } } - @SuppressWarnings("argument") - private StreamingGetDataRequest flushToBatch(List requests) { -// Put all global data requests first because there is only a single repeated field for -// request ids and the initial ids correspond to global data requests if they are present. -requests.sort(QueuedRequest.globalRequestsFirst()); -StreamingGetDataRequest.Builder builder = StreamingGetDataRequest.newBuilder(); -for (QueuedRequest request : requests) { - request.addToStreamingGetDataRequest(builder); -} -return builder.build(); + private synchronized void verify(boolean condition, String message) { Review Comment: done ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -144,41 +149,57 @@ protected boolean hasPendingRequests() { } @Override - public void sendHealthCheck() { + public void sendHealthCheck() throws WindmillStreamShutdownException { if (has
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839919732 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,129 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); +// If getPhase() returns a value less than 0, the phaser has been terminated. +if (awaitPhase < 0) { + return; +} + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { -awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. -phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +int nextPhase = +isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +// If nextPhase is a value less than 0, the phaser has been terminated. +if (nextPhase < 0) { Review Comment: added checks under `lock` to `onError` and `onComplete` all `onNext` calls are already guarded by `lock` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839901332 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GrpcGetDataStreamTest { + private static final String FAKE_SERVER_NAME = "Fake server for GrpcGetDataStreamTest"; + private static final Windmill.JobHeader TEST_JOB_HEADER = + Windmill.JobHeader.newBuilder() + .setJobId("test_job") + .setWorkerId("test_worker") + .setProjectId("test_project") + .build(); + + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws IOException { +Server server = +InProcessServerBuilder.forName(FAKE_SERVER_NAME) +.fallbackHandlerRegistry(serviceRegistry) +.directExecutor() +.build() +.start(); + +inProcessChannel = +grpcCleanup.register( + InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build()); +grpcCleanup.register(server); +grpcCleanup.register(inProcessChannel); + } + + @After + public void cleanUp() { +inProcessChannel.shutdownNow(); + } + + private GrpcGetDataStream createGetDataStream(GetDataStreamTestStub testStub) { +serviceRegistry.addService(testStub); +GrpcGetDataStream getDataStream = +(GrpcGetDataStream) +GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) +.setSendKeyedGetDataRequests(false) +.build() +.createGetDataStream( +CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel), +new ThrottleTimer()); +getDataStream.start(); +return getDataStream; + } + + @Test + public void testRequestKeyedData_sendOnShutdownStreamThrowsWindmillStreamShutdownException() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond t
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on PR #32774: URL: https://github.com/apache/beam/pull/32774#issuecomment-2473107804 still need to add some more tests to `DirectStreamObserverTest` addressed the other comments thanks! @scwhittle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839902664 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ## @@ -234,8 +229,13 @@ public void appendSpecificHtml(PrintWriter writer) { } @Override - public void sendHealthCheck() { -send(HEALTH_CHECK_REQUEST); + public void sendHealthCheck() throws WindmillStreamShutdownException { +trySend(HEALTH_CHECK_REQUEST); + } + + @Override + protected void shutdownInternal() { +workItemAssemblers.clear(); Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839907045 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -271,12 +297,26 @@ public void onHeartbeatResponse(List resp } @Override - public void sendHealthCheck() { + public void sendHealthCheck() throws WindmillStreamShutdownException { if (hasPendingRequests()) { - send(StreamingGetDataRequest.newBuilder().build()); + trySend(HEALTH_CHECK_REQUEST); } } + @Override + protected void shutdownInternal() { +// Stream has been explicitly closed. Drain pending input streams and request batches. +// Future calls to send RPCs will fail. +pending.values().forEach(AppendableInputStream::cancel); +pending.clear(); +batches.forEach( +batch -> { + batch.markFinalized(); Review Comment: done ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -301,39 +341,58 @@ public void appendSpecificHtml(PrintWriter writer) { writer.append("]"); } - private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) { -while (true) { + private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) + throws WindmillStreamShutdownException { +while (!isShutdownLocked()) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); - } catch (CancellationException e) { -// Retry issuing the request since the response stream was cancelled. -continue; + } catch (AppendableInputStream.InvalidInputStreamStateException | CancellationException e) { +handleShutdown(request, e); +if (!(e instanceof CancellationException)) { + throw e; +} } catch (IOException e) { LOG.error("Parsing GetData response failed: ", e); -continue; } catch (InterruptedException e) { Thread.currentThread().interrupt(); +handleShutdown(request, e); throw new RuntimeException(e); } finally { pending.remove(request.id()); } } + +throw shutdownExceptionFor(request); } - private void queueRequestAndWait(QueuedRequest request) throws InterruptedException { + private synchronized void handleShutdown(QueuedRequest request, Throwable cause) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839906288 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -187,21 +208,26 @@ private long uniqueId() { } @Override - public KeyedGetDataResponse requestKeyedData(String computation, KeyedGetDataRequest request) { + public KeyedGetDataResponse requestKeyedData(String computation, KeyedGetDataRequest request) + throws WindmillStreamShutdownException { return issueRequest( QueuedRequest.forComputation(uniqueId(), computation, request), KeyedGetDataResponse::parseFrom); } @Override - public GlobalData requestGlobalData(GlobalDataRequest request) { + public GlobalData requestGlobalData(GlobalDataRequest request) + throws WindmillStreamShutdownException { return issueRequest(QueuedRequest.global(uniqueId(), request), GlobalData::parseFrom); } @Override - public void refreshActiveWork(Map> heartbeats) { -if (isShutdown()) { - throw new WindmillStreamShutdownException("Unable to refresh work for shutdown stream."); + public void refreshActiveWork(Map> heartbeats) + throws WindmillStreamShutdownException { +synchronized (this) { + if (isShutdown) { Review Comment: done ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -301,39 +341,58 @@ public void appendSpecificHtml(PrintWriter writer) { writer.append("]"); } - private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) { -while (true) { + private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) + throws WindmillStreamShutdownException { +while (!isShutdownLocked()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839903881 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -121,32 +131,44 @@ public static GrpcGetDataStream create( int streamingRpcBatchLimit, boolean sendKeyedGetDataRequests, Consumer> processHeartbeatResponses) { -GrpcGetDataStream getDataStream = -new GrpcGetDataStream( -backendWorkerToken, -startGetDataRpcFn, -backoff, -streamObserverFactory, -streamRegistry, -logEveryNStreamFailures, -getDataThrottleTimer, -jobHeader, -idGenerator, -streamingRpcBatchLimit, -sendKeyedGetDataRequests, -processHeartbeatResponses); -getDataStream.startStream(); -return getDataStream; +return new GrpcGetDataStream( +backendWorkerToken, +startGetDataRpcFn, +backoff, +streamObserverFactory, +streamRegistry, +logEveryNStreamFailures, +getDataThrottleTimer, +jobHeader, +idGenerator, +streamingRpcBatchLimit, +sendKeyedGetDataRequests, +processHeartbeatResponses); + } + + private static WindmillStreamShutdownException shutdownExceptionFor(QueuedBatch batch) { +return new WindmillStreamShutdownException( +"Stream was closed when attempting to send " + batch.requestsCount() + " requests."); + } + + private static WindmillStreamShutdownException shutdownExceptionFor(QueuedRequest request) { +return new WindmillStreamShutdownException( +"Cannot send request=[" + request + "] on closed stream."); + } + + private void sendIgnoringClosed(StreamingGetDataRequest getDataRequest) + throws WindmillStreamShutdownException { +trySend(getDataRequest); } @Override - protected synchronized void onNewStream() { -send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build()); -if (clientClosed.get()) { + protected synchronized void onNewStream() throws WindmillStreamShutdownException { +trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build()); +if (clientClosed) { // We rely on close only occurring after all methods on the stream have returned. // Since the requestKeyedData and requestGlobalData methods are blocking this // means there should be no pending requests. - verify(!hasPendingRequests()); + verify(!hasPendingRequests(), "Pending requests not expected on stream restart."); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839882212 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java: ## @@ -460,8 +470,9 @@ private void flushResponse() { "Sending batched response of {} ids", responseBuilder.getRequestIdCount()); try { responseObserver.onNext(responseBuilder.build()); - } catch (IllegalStateException e) { + } catch (Exception e) { // Stream is already closed. +System.out.println("trieu: " + e); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839839690 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.InOrder; + +@RunWith(JUnit4.class) +public class GrpcCommitWorkStreamTest { + private static final String FAKE_SERVER_NAME = "Fake server for GrpcCommitWorkStreamTest"; + private static final Windmill.JobHeader TEST_JOB_HEADER = + Windmill.JobHeader.newBuilder() + .setJobId("test_job") + .setWorkerId("test_worker") + .setProjectId("test_project") + .build(); + private static final String COMPUTATION_ID = "computationId"; + + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); + private ManagedChannel inProcessChannel; + + private static Windmill.WorkItemCommitRequest workItemCommitRequest(long value) { +return Windmill.WorkItemCommitRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setShardingKey(value) +.setWorkToken(value) +.setCacheToken(value) +.build(); + } + + @Before + public void setUp() throws IOException { +Server server = +InProcessServerBuilder.forName(FAKE_SERVER_NAME) +.fallbackHandlerRegistry(serviceRegistry) +.directExecutor() +.build() +.start(); + +inProcessChannel = +grpcCleanup.register( + InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build()); +grpcCleanup.register(server); +grpcCleanup.register(inProcessChannel); + } + + @After + public void cleanUp() { +inProcessChannel.shutdownNow(); + } + + private GrpcCommitWorkStream createCommitWorkStream(CommitWorkStreamTestStub testStub) { +serviceRegistry.addService(testStub); +GrpcCommitWorkStream commitWorkStream = +(GrpcCommitWorkStream) +GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) +.build() +.createCommitWorkStream( +CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel), +new
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839868939 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GrpcGetDataStreamRequestsTest { + + @Test + public void testQueuedRequest_globalRequestsFirstComparator() { +List requests = new ArrayList<>(); +Windmill.KeyedGetDataRequest keyedGetDataRequest1 = +Windmill.KeyedGetDataRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setCacheToken(1L) +.setShardingKey(1L) +.setWorkToken(1L) +.setMaxBytes(Long.MAX_VALUE) +.build(); +requests.add( +GrpcGetDataStreamRequests.QueuedRequest.forComputation( +1, "computation1", keyedGetDataRequest1)); + +Windmill.KeyedGetDataRequest keyedGetDataRequest2 = +Windmill.KeyedGetDataRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setCacheToken(2L) +.setShardingKey(2L) +.setWorkToken(2L) +.setMaxBytes(Long.MAX_VALUE) +.build(); +requests.add( +GrpcGetDataStreamRequests.QueuedRequest.forComputation( +2, "computation2", keyedGetDataRequest2)); + +Windmill.GlobalDataRequest globalDataRequest = +Windmill.GlobalDataRequest.newBuilder() +.setDataId( +Windmill.GlobalDataId.newBuilder() +.setTag("globalData") +.setVersion(ByteString.EMPTY) +.build()) +.setComputationId("computation1") +.build(); +requests.add(GrpcGetDataStreamRequests.QueuedRequest.global(3, globalDataRequest)); + + requests.sort(GrpcGetDataStreamRequests.QueuedRequest.globalRequestsFirst()); + +// First one should be the global request. +assertTrue(requests.get(0).getDataRequest().isGlobal()); + } + + @Test + public void testQueuedBatch_asGetDataRequest() { +GrpcGetDataStreamRequests.QueuedBatch queuedBatch = new GrpcGetDataStreamRequests.QueuedBatch(); + +Windmill.KeyedGetDataRequest keyedGetDataRequest1 = +Windmill.KeyedGetDataRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setCacheToken(1L) +.setShardingKey(1L) +.setWorkToken(1L) +.setMaxBytes(Long.MAX_VALUE) +.build(); +queuedBatch.addRequest( +GrpcGetDataStreamRequests.QueuedRequest.forComputation( +1, "computation1", keyedGetDataRequest1)); + +Windmill.KeyedGetDataRequest keyedGetDataRequest2 = +Windmill.KeyedGetDataRequest.newBuilder() +.setKey(ByteString.EMPTY) +.setCacheToken(2L) +.setShardingKey(2L) +.setWorkToken(2L) +.setMaxBytes(Long.MAX_VALUE) +.build(); +queuedBatch.addRequest( +GrpcGetDataStreamRequests.QueuedRequest.forComputation( +2, "computation2", keyedGetDataRequest2)); + +Windmill.GlobalDataRequest globalDataRequest = +Windmill.GlobalDataRequest.newBuilder() +.setDataId( +Windmill.GlobalDataId.newBuilder() +.setTag("globalData") +.setVersion(ByteString.EMPTY) +.build()) +.setComputationId("computation1") +.build
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1839380434 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import java.io.PrintWriter; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class AbstractWindmillStreamTest { + private static final long DEADLINE_SECONDS = 10; + private final Set> streamRegistry = ConcurrentHashMap.newKeySet(); + private final StreamObserverFactory streamObserverFactory = + StreamObserverFactory.direct(DEADLINE_SECONDS, 1); + + @Before + public void setUp() { +streamRegistry.clear(); + } + + private TestStream newStream( + Function, StreamObserver> clientFactory) { +return new TestStream(clientFactory, streamRegistry, streamObserverFactory); + } + + @Test + public void testShutdown_notBlockedBySend() throws InterruptedException, ExecutionException { +CountDownLatch sendBlocker = new CountDownLatch(1); +Function, StreamObserver> clientFactory = +ignored -> +new CallStreamObserver() { + @Override + public void onNext(Integer integer) { +try { + sendBlocker.await(); +} catch (InterruptedException e) { + throw new RuntimeException(e); +} + } + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() {} + + @Override + public boolean isReady() { +return false; + } + + @Override + public void setOnReadyHandler(Runnable runnable) {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int i) {} + + @Override + public void setMessageCompression(boolean b) {} +}; + +TestStream testStream = newStream(clientFactory); +testStream.start(); +ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); +Future sendFuture = +sendExecutor.submit( +() -> +assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1))); +testStream.shutdown(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838568276 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,128 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838725854 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,128 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); +// If getPhase() returns a value less than 0, the phaser has been terminated. +if (awaitPhase < 0) { + return; +} + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { -awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. -phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +int nextPhase = +isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +// If nextPhase is a value less than 0, the phaser has been terminated. +if (nextPhase < 0) { + throw new StreamObserverCancelledException("StreamObserver was terminated."); +} + synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; messagesSinceReady = 0; outboundObserver.onNext(value); return; } } catch (TimeoutException e) { totalSecondsWaited += waitSeconds; if (totalSecondsWaited > deadlineSeconds) { - LOG.error( - "Exceeded timeout waiting for the outboundObserver to become ready meaning " - + "that the stream deadline was not respected."); - throw new RuntimeException(e); + String errorMessage = constructStreamCancelledErrorMessage(totalSecondsWaited); + LOG.error(errorMessage); + throw new StreamObserverCancelledException(errorMessage, e); } -if (totalSecondsWaited > 30) { + +if (totalSecondsWaited > OUTPUT_CHANNEL_CONSIDERED_STALLED_SECONDS) { LOG.info( "Output channel stalled for {}s, outbound thread {}.", totalSecondsWaited, Thread.currentThread().getName()); } + waitSeconds = waitSeconds * 2; } catch (InterruptedException e) { Thread.currentThread().interrupt(); -throw new RuntimeException(e); +throw new StreamObserverCancelledException(e); } } } @Override public void onError(Throwable t) { +isReadyNoti
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838725029 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -156,29 +163,44 @@ public void sendHealthCheck() { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); -RuntimeException finalException = null; +CommitCompletionException failures = new CommitCompletionException(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838707399 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -139,4 +166,29 @@ public void onCompleted() { outboundObserver.onCompleted(); } } + + @Override + public void terminate(Throwable terminationException) { +// Free the blocked threads in onNext(). +isReadyNotifier.forceTermination(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838705361 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838574677 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,128 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); +// If getPhase() returns a value less than 0, the phaser has been terminated. +if (awaitPhase < 0) { + return; +} + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { -awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. -phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +int nextPhase = +isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +// If nextPhase is a value less than 0, the phaser has been terminated. +if (nextPhase < 0) { + throw new StreamObserverCancelledException("StreamObserver was terminated."); +} + synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; messagesSinceReady = 0; outboundObserver.onNext(value); return; } } catch (TimeoutException e) { totalSecondsWaited += waitSeconds; if (totalSecondsWaited > deadlineSeconds) { - LOG.error( - "Exceeded timeout waiting for the outboundObserver to become ready meaning " - + "that the stream deadline was not respected."); - throw new RuntimeException(e); + String errorMessage = constructStreamCancelledErrorMessage(totalSecondsWaited); + LOG.error(errorMessage); + throw new StreamObserverCancelledException(errorMessage, e); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838585634 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/TerminatingStreamObserver.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers; + +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; + +@Internal +public interface TerminatingStreamObserver extends StreamObserver { + + /** Terminates the StreamObserver. */ Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838570350 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,128 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); +// If getPhase() returns a value less than 0, the phaser has been terminated. +if (awaitPhase < 0) { + return; +} + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { -awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. -phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +int nextPhase = +isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +// If nextPhase is a value less than 0, the phaser has been terminated. +if (nextPhase < 0) { + throw new StreamObserverCancelledException("StreamObserver was terminated."); +} + synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838561714 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -146,6 +146,7 @@ public void onNext(T value) throws StreamObserverCancelledException { "Output channel stalled for {}s, outbound thread {}.", totalSecondsWaited, Thread.currentThread().getName()); + Thread.dumpStack(); Review Comment: removed this waas for debugging ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java: ## @@ -472,7 +473,8 @@ private void flushResponse() { responseObserver.onNext(responseBuilder.build()); } catch (Exception e) { // Stream is already closed. -System.out.println("trieu: " + e); +LOG.warn("trieu: ", e); Review Comment: removed this waas for debugging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838569451 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,128 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838563507 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -317,10 +320,13 @@ public String backendWorkerToken() { return backendWorkerToken; } + @SuppressWarnings("GuardedBy") @Override public final void shutdown() { -// Don't lock on "this" before poisoning the request observer as allow IO to block shutdown. +// Don't lock on "this" before poisoning the request observer since otherwise the observer may +// be blocking in send(). requestObserver.poison(); +isShutdown = true; Review Comment: cleaneed up was supposed to stay within the sync block -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1838119412 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -146,6 +146,7 @@ public void onNext(T value) throws StreamObserverCancelledException { "Output channel stalled for {}s, outbound thread {}.", totalSecondsWaited, Thread.currentThread().getName()); + Thread.dumpStack(); Review Comment: if you want to submit, log this instead. We've used things liek StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10); elsehwere to get a string to log ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java: ## @@ -472,7 +473,8 @@ private void flushResponse() { responseObserver.onNext(responseBuilder.build()); } catch (Exception e) { // Stream is already closed. -System.out.println("trieu: " + e); +LOG.warn("trieu: ", e); Review Comment: rm debug logs ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -144,41 +149,57 @@ protected boolean hasPendingRequests() { } @Override - public void sendHealthCheck() { + public void sendHealthCheck() throws WindmillStreamShutdownException { if (hasPendingRequests()) { StreamingCommitWorkRequest.Builder builder = StreamingCommitWorkRequest.newBuilder(); builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID); - send(builder.build()); + trySend(builder.build()); } } @Override protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); -RuntimeException finalException = null; +CommitCompletionException failures = new CommitCompletionException(); for (int i = 0; i < response.getRequestIdCount(); ++i) { Review Comment: ping for above comment since might be lost in github comment threads. I was thinking the builder would not be an exception itself, and then the exception would just be a simple class without mutating methods. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,76 +72,129 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); +// If getPhase() returns a value less than 0, the phaser has been terminated. +if (awaitPhase < 0) { + return; +} + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { -awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't al
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837622491 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -152,114 +149,157 @@ private static long debugDuration(long nowMs, long startMs) { */ protected abstract void startThrottleTimer(); - /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + /** Try to send a request to the server. Returns true if the request was successfully sent. */ + @CanIgnoreReturnValue + protected final synchronized boolean trySend(RequestT request) + throws WindmillStreamShutdownException { +debugMetrics.recordSend(); +try { + requestObserver.onNext(request); + return true; +} catch (StreamClosedException e) { + // Stream was broken, requests may be retried when stream is reopened. } -return requestObserver; +return false; } - /** Send a request to the server. */ - protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); + @Override + public final void start() { +boolean shouldStartStream = false; synchronized (this) { - if (streamClosed.get()) { -throw new IllegalStateException("Send called on a client closed stream."); + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; } +} - requestObserver().onNext(request); +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + debugMetrics.recordStart(); + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +// shutdown() is responsible for cleaning up pending requests. +logger.debug("Stream was shutdown while creating new stream.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", + getClass()); + // Shutdown the stream to clean up any dangling resources and pending requests. + shutdown(); + break; +} catch (IOException ioe) { // Keep trying to create the stream. } } } + +// We were never able to start the stream, remove it from the stream registry. Otherwise, it is +// removed when closed. +streamRegistry.remove(this); } - protected final Executor executor() { -return executor; + /** + * Execute the runnable using the {@link #executor} handling the executor being in a shutdown + * state. + */ + protected final void executeSafely(Runnable runnable) { +try { + executor.execute(runnable); +} catch (RejectedExecutionException e) { + logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); +} } public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) { -if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && !clientClosed.get()) { +if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { try { sendHealthCheck(); - } catch (RuntimeException e) { -LOG.debug("Received exception sending health check.", e); + } catch (Exception e) { +logger.debug("Received exception sending health check.", e); } } } - p
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837621434 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -305,77 +341,96 @@ public void onNext(ResponseT response) { } catch (IOException e) { // Ignore. } - lastResponseTimeMs.set(Instant.now().getMillis()); + debugMetrics.recordResponse(); onResponse(response); } @Override public void onError(Throwable t) { - onStreamFinished(t); + if (maybeTeardownStream()) { +return; + } + + recordStreamStatus(Status.fromThrowable(t)); + + try { +long sleep = backoff.nextBackOffMillis(); +debugMetrics.recordSleep(sleep); +sleeper.sleep(sleep); + } catch (InterruptedException e) { +Thread.currentThread().interrupt(); +return; + } catch (IOException e) { +// Ignore. + } + + executeSafely(AbstractWindmillStream.this::startStream); } @Override public void onCompleted() { - onStreamFinished(null); + if (maybeTeardownStream()) { +return; + } + recordStreamStatus(OK_STATUS); + executeSafely(AbstractWindmillStream.this::startStream); } -private void onStreamFinished(@Nullable Throwable t) { - synchronized (this) { -if (isShutdown.get() || (clientClosed.get() && !hasPendingRequests())) { - streamRegistry.remove(AbstractWindmillStream.this); - finishLatch.countDown(); - return; -} - } - if (t != null) { -Status status = null; -if (t instanceof StatusRuntimeException) { - status = ((StatusRuntimeException) t).getStatus(); -} -String statusError = status == null ? "" : status.toString(); -setLastError(statusError); -if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) { +private void recordStreamStatus(Status status) { + int currentRestartCount = debugMetrics.incrementAndGetRestarts(); + if (status.isOk()) { +String restartReason = +"Stream completed successfully but did not complete requested operations, " ++ "recreating"; +logger.warn(restartReason); +debugMetrics.recordRestartReason(restartReason); + } else { +int currentErrorCount = debugMetrics.incrementAndGetErrors(); +debugMetrics.recordRestartReason(status.toString()); +Throwable t = status.getCause(); +if (t instanceof StreamObserverCancelledException) { + logger.error( + "StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}", + getClass(), + backendWorkerToken, + t.getStackTrace(), + t); +} else if (currentRestartCount % logEveryNStreamFailures == 0) { + // Don't log every restart since it will get noisy, and many errors transient. long nowMillis = Instant.now().getMillis(); - String responseDebug; - if (lastResponseTimeMs.get() == 0) { -responseDebug = "never received response"; - } else { -responseDebug = -"received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago"; - } - LOG.debug( - "{} streaming Windmill RPC errors for {}, last was: {} with status {}." - + " created {}ms ago, {}. This is normal with autoscaling.", + logger.debug( + "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}" + + " with status: {}. created {}ms ago; {}. This is normal with autoscaling.", AbstractWindmillStream.this.getClass(), - errorCount.get(), + currentRestartCount, + currentErrorCount, t, - statusError, - nowMillis - startTimeMs.get(), - responseDebug); + status, + nowMillis - debugMetrics.getStartTimeMs(), + debugMetrics + .responseDebugString(nowMillis) + .orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING)); } + // If the stream was stopped due to a resource exhausted error then we are throttled. -if (status != null && status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { +if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837624075 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -278,23 +318,19 @@ public String backendWorkerToken() { } @Override - public void shutdown() { -if (isShutdown.compareAndSet(false, true)) { - requestObserver() - .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + public final void shutdown() { +// Don't lock on "this" before poisoning the request observer as allow IO to block shutdown. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837626505 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.slf4j.Logger; + +/** + * Request observer that allows resetting its internal delegate using the given {@link + * #streamObserverFactory}. + * + * @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be + * {@link ThreadSafe}. Has same methods declared in {@link StreamObserver}, but they throw + * {@link StreamClosedException} and {@link WindmillStreamShutdownException}, which much be + * handled by callers. + */ +@ThreadSafe +@Internal +final class ResettableThrowingStreamObserver { + private final Supplier> streamObserverFactory; + private final Logger logger; + + @GuardedBy("this") + private @Nullable TerminatingStreamObserver delegateStreamObserver; + + @GuardedBy("this") + private boolean isPoisoned = false; + + /** + * Indicates that the current delegate is closed via {@link #poison() or {@link #onCompleted()}}. + * If not poisoned, a call to {@link #reset()} is required to perform future operations on the + * StreamObserver. + */ + @GuardedBy("this") + private boolean isCurrentStreamClosed = false; + + ResettableThrowingStreamObserver( + Supplier> streamObserverFactory, Logger logger) { +this.streamObserverFactory = streamObserverFactory; +this.logger = logger; +this.delegateStreamObserver = null; + } + + private synchronized StreamObserver delegate() + throws WindmillStreamShutdownException, StreamClosedException { +if (isPoisoned) { + throw new WindmillStreamShutdownException("Stream is already shutdown."); +} + +if (isCurrentStreamClosed) { + throw new StreamClosedException( + "Current stream is closed, requires reset for future stream operations."); +} + +return Preconditions.checkNotNull( +delegateStreamObserver, +"requestObserver cannot be null. Missing a call to startStream() to initialize."); + } + + /** Creates a new delegate to use for future {@link StreamObserver} methods. */ + synchronized void reset() throws WindmillStreamShutdownException { +if (isPoisoned) { + throw new WindmillStreamShutdownException("Stream is already shutdown."); +} + +delegateStreamObserver = streamObserverFactory.get(); +isCurrentStreamClosed = false; + } + + /** + * Indicates that the request observer should no longer be used. Attempts to perform operations on + * the request observer will throw an {@link WindmillStreamShutdownException}. + */ + synchronized void poison() { +if (!isPoisoned) { + isPoisoned = true; + if (delegateStreamObserver != null) { +delegateStreamObserver.terminate( +new WindmillStreamShutdownException("Explicit call to shutdown stream.")); +delegateStreamObserver = null; +isCurrentStreamClosed = true; + } +} + } + + public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownException { +// Make sure onNext and onError below to be called on the same StreamObserver instance. +StreamObserver delegate = delegate(); +try { + // Do NOT lock while sending message over the stream as this will block other StreamObserver + // operation
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837625329 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.slf4j.Logger; + +/** + * Request observer that allows resetting its internal delegate using the given {@link + * #streamObserverFactory}. + * + * @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be + * {@link ThreadSafe}. Has same methods declared in {@link StreamObserver}, but they throw + * {@link StreamClosedException} and {@link WindmillStreamShutdownException}, which much be + * handled by callers. + */ +@ThreadSafe +@Internal +final class ResettableThrowingStreamObserver { + private final Supplier> streamObserverFactory; + private final Logger logger; + + @GuardedBy("this") + private @Nullable TerminatingStreamObserver delegateStreamObserver; + + @GuardedBy("this") + private boolean isPoisoned = false; + + /** + * Indicates that the current delegate is closed via {@link #poison() or {@link #onCompleted()}}. + * If not poisoned, a call to {@link #reset()} is required to perform future operations on the + * StreamObserver. + */ + @GuardedBy("this") + private boolean isCurrentStreamClosed = false; + + ResettableThrowingStreamObserver( + Supplier> streamObserverFactory, Logger logger) { +this.streamObserverFactory = streamObserverFactory; +this.logger = logger; +this.delegateStreamObserver = null; + } + + private synchronized StreamObserver delegate() + throws WindmillStreamShutdownException, StreamClosedException { +if (isPoisoned) { + throw new WindmillStreamShutdownException("Stream is already shutdown."); +} + +if (isCurrentStreamClosed) { + throw new StreamClosedException( + "Current stream is closed, requires reset for future stream operations."); +} + +return Preconditions.checkNotNull( +delegateStreamObserver, +"requestObserver cannot be null. Missing a call to startStream() to initialize."); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837618004 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java: ## @@ -59,6 +59,9 @@ final class StreamDebugMetrics { @GuardedBy("this") private DateTime shutdownTime = null; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837608239 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) { } /** Returns true if the stream was torn down and should not be restarted internally. */ -private synchronized boolean maybeTeardownStream() { - if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) { -streamRegistry.remove(AbstractWindmillStream.this); -finishLatch.countDown(); -executor.shutdownNow(); -return true; - } +private boolean maybeTeardownStream() { + synchronized (AbstractWindmillStream.this) { Review Comment: fixed by moving it out of the inner class GuardedBy check was failing since reference to `this` was ambiguous ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) { } /** Returns true if the stream was torn down and should not be restarted internally. */ -private synchronized boolean maybeTeardownStream() { - if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) { -streamRegistry.remove(AbstractWindmillStream.this); -finishLatch.countDown(); -executor.shutdownNow(); -return true; - } +private boolean maybeTeardownStream() { + synchronized (AbstractWindmillStream.this) { +if (isShutdown || (clientClosed && !hasPendingRequests())) { + streamRegistry.remove(AbstractWindmillStream.this); Review Comment: fixed by moving it out of the inner class GuardedBy check was failing since reference to `this` was ambiguous -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1837603481 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -272,7 +280,7 @@ public final void appendSummaryHtml(PrintWriter writer) { summaryMetrics.timeSinceLastSend(), summaryMetrics.timeSinceLastResponse(), requestObserver.isClosed(), -hasReceivedShutdownSignal(), +summaryMetrics.shutdownTime().isPresent(), Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1833955728 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) { } /** Returns true if the stream was torn down and should not be restarted internally. */ -private synchronized boolean maybeTeardownStream() { - if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) { -streamRegistry.remove(AbstractWindmillStream.this); -finishLatch.countDown(); -executor.shutdownNow(); -return true; - } +private boolean maybeTeardownStream() { + synchronized (AbstractWindmillStream.this) { Review Comment: just make a synchronized method? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java: ## @@ -59,6 +59,9 @@ final class StreamDebugMetrics { @GuardedBy("this") private DateTime shutdownTime = null; Review Comment: mark nullable and above I think you still need to fix the over exemption of windmill classes from checker. https://github.com/apache/beam/issues/30183 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -272,7 +280,7 @@ public final void appendSummaryHtml(PrintWriter writer) { summaryMetrics.timeSinceLastSend(), summaryMetrics.timeSinceLastResponse(), requestObserver.isClosed(), -hasReceivedShutdownSignal(), +summaryMetrics.shutdownTime().isPresent(), Review Comment: nit: could remove this part since it is duplicated with null. Could render null differently maybe somethign like summaryMetrics.shutdownTime().map(Instant::toString).orElse("not shutdown") ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) { } /** Returns true if the stream was torn down and should not be restarted internally. */ -private synchronized boolean maybeTeardownStream() { - if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) { -streamRegistry.remove(AbstractWindmillStream.this); -finishLatch.countDown(); -executor.shutdownNow(); -return true; - } +private boolean maybeTeardownStream() { + synchronized (AbstractWindmillStream.this) { +if (isShutdown || (clientClosed && !hasPendingRequests())) { + streamRegistry.remove(AbstractWindmillStream.this); Review Comment: why doesn't just "this" work? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.slf4j.Logger; + +/** + * Request observer that allows resetting its internal delegate using the given {@link + * #streamObserverFactory}. + * + * @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be + * {@link ThreadSafe}. Has same methods declared in {@link StreamObserver
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831934329 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); + protected synchronized boolean hasReceivedShutdownSignal() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831921566 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -67,61 +69,86 @@ public DirectStreamObserver( } @Override - public void onNext(T value) { + public void onNext(T value) throws StreamObserverCancelledException { int awaitPhase = -1; long totalSecondsWaited = 0; long waitSeconds = 1; while (true) { try { synchronized (lock) { + int currentPhase = isReadyNotifier.getPhase(); + // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted + // are synchronized after terminating the phaser if we observe that the phaser is not + // terminated the onNext calls below are guaranteed to not be called on a closed observer. + if (currentPhase < 0) return; + + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { +awaitPhase = isReadyNotifier.getPhase(); +// If getPhase() returns a value less than 0, the phaser has been terminated. +if (awaitPhase < 0) { + return; +} + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { -awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. -phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +int nextPhase = +isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); +// If nextPhase is a value less than 0, the phaser has been terminated. +if (nextPhase < 0) { + return; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831907579 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +/** Thrown when operations are requested on a {@link WindmillStream} has been shutdown/closed. */ +public final class WindmillStreamShutdownException extends Exception { Review Comment: I think its also nice to have both since we ignore the StreamClosedException usually and We always propogate the WindmillStreamShutdownException might be useful to just have send return true/false? instead of throwing StreamClosed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831905512 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +/** Thrown when operations are requested on a {@link WindmillStream} has been shutdown/closed. */ +public final class WindmillStreamShutdownException extends Exception { Review Comment: It's not really retryable, just retries internally to flush any pending messages so just updated the comment and moved both exceptions in `WindmillStreamExceptions.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831900037 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -139,4 +166,29 @@ public void onCompleted() { outboundObserver.onCompleted(); } } + + @Override + public void terminate(Throwable terminationException) { +// Free the blocked threads in onNext(). +isReadyNotifier.forceTermination(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831884637 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +/** Thrown when operations are requested on a {@link WindmillStream} has been shutdown/closed. */ +public final class WindmillStreamShutdownException extends Exception { Review Comment: Renamed StreamClosedException to RetryableStreamClosedException since we will try to flush all of the pending requests in this case. For shutdown it means that we are no longer operating with the stream as it is invalid -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831883547 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -139,4 +166,29 @@ public void onCompleted() { outboundObserver.onCompleted(); Review Comment: When I look at how this is used in StreamObserverFactory we call phaser.forceTermination in `ForwardingClientREsponseObserver#onDone` so we would always be calling onError here since the onDoneHandler runs and terminates the StreamObserver? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831879138 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ## @@ -139,4 +166,29 @@ public void onCompleted() { outboundObserver.onCompleted(); } } + + @Override + public void terminate(Throwable terminationException) { +// Free the blocked threads in onNext(). +isReadyNotifier.forceTermination(); +try { + onError(terminationException); +} catch (RuntimeException e) { + // If onError or onComplete was previously called, this will throw. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831875706 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); + protected synchronized boolean hasReceivedShutdownSignal() { +return isShutdown; } - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; + /** Send a request to the server. */ + protected final synchronized void send(RequestT request) + throws StreamClosedException, WindmillStreamShutdownException { +debugMetrics.recordSend(); +requestObserver.onNext(request); } - /** Send a request to the server. */ - protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); + @Override + public final void start() { +boolean shouldStartStream = false; synchronized (this) { - if (streamClosed.get()) { -throw new IllegalStateException("Send called on a client closed stream."); + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; } +} - requestObserver().onNext(request); +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + debugMetrics.recordStart(); + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown while creating new stream.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", + getClass()); + break; +} catch (IOException ioe) { // Keep trying to create the stream. } } } + +// We were never able to start the stream, remove it from the stream registry. Otherwise, it is +// removed when closed. +streamRegistry.remove(this); } - protected final Executor executor() { -return executor; + /** + * Execute the runnable using the {@link #executor} handling the executor being in a shutdown + * state. + */ + protected final void executeSafely(Runnable runnable) { +try { + executor.execute(runnable); +} catch (RejectedExecutionException e) { + logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); +} } - public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) { -if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && !clientClosed.get()) { + public final void maybeSendHealthCheck(Instant lastSendThreshold) { +if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { try { sendHealthCheck(); - } catch (RuntimeException e) { -LOG.debug("Received exception sending health check.", e); + } catch (Exception e) { +logger.debug("Received exception sending health check.", e); } } } - protected abstract void sendHealthCheck(); + protected abstract void sendHealthCheck() + throws WindmillStreamShutdownException, StreamClosedException; - // Care is taken that synchronization on this is unnecessary for all status page information. - // Blocking s
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831869718 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); + protected synchronized boolean hasReceivedShutdownSignal() { +return isShutdown; } - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; + /** Send a request to the server. */ + protected final synchronized void send(RequestT request) + throws StreamClosedException, WindmillStreamShutdownException { +debugMetrics.recordSend(); +requestObserver.onNext(request); } - /** Send a request to the server. */ - protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); + @Override + public final void start() { +boolean shouldStartStream = false; synchronized (this) { - if (streamClosed.get()) { -throw new IllegalStateException("Send called on a client closed stream."); + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; } +} - requestObserver().onNext(request); +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + debugMetrics.recordStart(); + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown while creating new stream.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", + getClass()); + break; +} catch (IOException ioe) { // Keep trying to create the stream. } } } + +// We were never able to start the stream, remove it from the stream registry. Otherwise, it is +// removed when closed. +streamRegistry.remove(this); } - protected final Executor executor() { -return executor; + /** + * Execute the runnable using the {@link #executor} handling the executor being in a shutdown + * state. + */ + protected final void executeSafely(Runnable runnable) { +try { + executor.execute(runnable); +} catch (RejectedExecutionException e) { + logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); +} } - public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) { -if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && !clientClosed.get()) { + public final void maybeSendHealthCheck(Instant lastSendThreshold) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831869554 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -49,46 +44,52 @@ * stream if it is broken. Subclasses are responsible for retrying requests that have been lost on a * broken stream. * - * Subclasses should override onResponse to handle responses from the server, and onNewStream to - * perform any work that must be done when a new stream is created, such as sending headers or - * retrying requests. + * Subclasses should override {@link #onResponse(ResponseT)} to handle responses from the server, + * and {@link #onNewStream()} to perform any work that must be done when a new stream is created, + * such as sending headers or retrying requests. * - * send and startStream should not be called from onResponse; use executor() instead. + * {@link #send(RequestT)} and {@link #startStream()} should not be called from {@link + * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead. * * Synchronization on this is used to synchronize the gRpc stream state and internal data * structures. Since grpc channel operations may block, synchronization on this stream may also * block. This is generally not a problem since streams are used in a single-threaded manner. * However, some accessors used for status page and other debugging need to take care not to require * synchronizing on this. + * + * {@link #start()} and {@link #shutdown()} are called once in the lifetime of the stream. Once + * {@link #shutdown()}, a stream in considered invalid and cannot be restarted/reused. */ public abstract class AbstractWindmillStream implements WindmillStream { - public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300; // Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce // per-chunk overhead, and small enough that we can still perform granular flow-control. protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20; - private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class); - protected final AtomicBoolean clientClosed; - private final AtomicBoolean isShutdown; - private final AtomicLong lastSendTimeMs; - private final Executor executor; + // Indicates that the logical stream has been half-closed and is waiting for clean server + // shutdown. + private static final Status OK_STATUS = Status.fromCode(Status.Code.OK); + private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never received response"; + protected final Sleeper sleeper; + + private final Logger logger; + private final ExecutorService executor; private final BackOff backoff; - private final AtomicLong startTimeMs; - private final AtomicLong lastResponseTimeMs; - private final AtomicInteger errorCount; - private final AtomicReference lastError; - private final AtomicReference lastErrorTime; - private final AtomicLong sleepUntil; private final CountDownLatch finishLatch; private final Set> streamRegistry; private final int logEveryNStreamFailures; - private final Supplier> requestObserverSupplier; - // Indicates if the current stream in requestObserver is closed by calling close() method - private final AtomicBoolean streamClosed; private final String backendWorkerToken; - private @Nullable StreamObserver requestObserver; + private final ResettableThrowingStreamObserver requestObserver; + private final StreamDebugMetrics debugMetrics; + protected volatile boolean clientClosed; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831723961 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -314,24 +417,35 @@ private Batcher() { @Override public boolean commitWorkItem( String computation, WorkItemCommitRequest commitRequest, Consumer onDone) { - if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { + if (!canAccept(commitRequest.getSerializedSize() + computation.length()) + || hasReceivedShutdownSignal()) { return false; } - PendingRequest request = new PendingRequest(computation, commitRequest, onDone); + + PendingRequest request = PendingRequest.create(computation, commitRequest, onDone); add(idGenerator.incrementAndGet(), request); return true; } /** Flushes any pending work items to the wire. */ @Override public void flush() { - flushInternal(queue); - queuedBytes = 0; - queue.clear(); + try { +if (!hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831723273 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -314,24 +417,35 @@ private Batcher() { @Override public boolean commitWorkItem( String computation, WorkItemCommitRequest commitRequest, Consumer onDone) { - if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { + if (!canAccept(commitRequest.getSerializedSize() + computation.length()) + || hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831722466 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -156,29 +163,44 @@ public void sendHealthCheck() { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); -RuntimeException finalException = null; +CommitCompletionException failures = new CommitCompletionException(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831720486 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -156,29 +163,44 @@ public void sendHealthCheck() { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); -RuntimeException finalException = null; +CommitCompletionException failures = new CommitCompletionException(); for (int i = 0; i < response.getRequestIdCount(); ++i) { long requestId = response.getRequestId(i); if (requestId == HEARTBEAT_REQUEST_ID) { continue; } - PendingRequest done = pending.remove(requestId); - if (done == null) { -LOG.error("Got unknown commit request ID: {}", requestId); + PendingRequest pendingRequest = pending.remove(requestId); + CommitStatus commitStatus = + i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; + if (pendingRequest == null) { +if (!hasReceivedShutdownSignal()) { + // Skip responses when the stream is shutdown since they are now invalid. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831708278 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); + protected synchronized boolean hasReceivedShutdownSignal() { +return isShutdown; } - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; + /** Send a request to the server. */ + protected final synchronized void send(RequestT request) + throws StreamClosedException, WindmillStreamShutdownException { +debugMetrics.recordSend(); +requestObserver.onNext(request); } - /** Send a request to the server. */ - protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); + @Override + public final void start() { +boolean shouldStartStream = false; synchronized (this) { - if (streamClosed.get()) { -throw new IllegalStateException("Send called on a client closed stream."); + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; } +} - requestObserver().onNext(request); +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + debugMetrics.recordStart(); + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown while creating new stream.", e); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831713003 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -121,32 +130,49 @@ public static GrpcGetDataStream create( int streamingRpcBatchLimit, boolean sendKeyedGetDataRequests, Consumer> processHeartbeatResponses) { -GrpcGetDataStream getDataStream = -new GrpcGetDataStream( -backendWorkerToken, -startGetDataRpcFn, -backoff, -streamObserverFactory, -streamRegistry, -logEveryNStreamFailures, -getDataThrottleTimer, -jobHeader, -idGenerator, -streamingRpcBatchLimit, -sendKeyedGetDataRequests, -processHeartbeatResponses); -getDataStream.startStream(); -return getDataStream; +return new GrpcGetDataStream( +backendWorkerToken, +startGetDataRpcFn, +backoff, +streamObserverFactory, +streamRegistry, +logEveryNStreamFailures, +getDataThrottleTimer, +jobHeader, +idGenerator, +streamingRpcBatchLimit, +sendKeyedGetDataRequests, +processHeartbeatResponses); + } + + private static WindmillStreamShutdownException shutdownExceptionFor(QueuedBatch batch) { +return new WindmillStreamShutdownException( +"Stream was closed when attempting to send " + batch.requestsCount() + " requests."); + } + + private static WindmillStreamShutdownException shutdownExceptionFor(QueuedRequest request) { +return new WindmillStreamShutdownException( +"Cannot send request=[" + request + "] on closed stream."); + } + + private void sendIgnoringClosed(StreamingGetDataRequest getDataRequest) + throws WindmillStreamShutdownException { +try { + send(getDataRequest); +} catch (StreamClosedException e) { + // Stream was closed on send, will be retried on stream restart. +} } @Override - protected synchronized void onNewStream() { + protected synchronized void onNewStream() + throws StreamClosedException, WindmillStreamShutdownException { send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build()); -if (clientClosed.get()) { +if (clientClosed && !hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831709979 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); + protected synchronized boolean hasReceivedShutdownSignal() { +return isShutdown; } - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; + /** Send a request to the server. */ + protected final synchronized void send(RequestT request) + throws StreamClosedException, WindmillStreamShutdownException { +debugMetrics.recordSend(); +requestObserver.onNext(request); } - /** Send a request to the server. */ - protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); + @Override + public final void start() { +boolean shouldStartStream = false; synchronized (this) { - if (streamClosed.get()) { -throw new IllegalStateException("Send called on a client closed stream."); + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; } +} - requestObserver().onNext(request); +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + debugMetrics.recordStart(); + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown while creating new stream.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", Review Comment: we shutdown the executor in maybeTeardownStream() so this would clean up any dangling tasks in the executor lets shutdown the stream done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1831707027 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -301,39 +343,59 @@ public void appendSpecificHtml(PrintWriter writer) { writer.append("]"); } - private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) { -while (true) { + private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) + throws WindmillStreamShutdownException { +while (!hasReceivedShutdownSignal()) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); - } catch (CancellationException e) { -// Retry issuing the request since the response stream was cancelled. -continue; + } catch (AppendableInputStream.InvalidInputStreamStateException | CancellationException e) { +handleShutdown(request, e); +if (!(e instanceof CancellationException)) { + throw e; +} } catch (IOException e) { LOG.error("Parsing GetData response failed: ", e); -continue; } catch (InterruptedException e) { Thread.currentThread().interrupt(); +handleShutdown(request, e); throw new RuntimeException(e); } finally { pending.remove(request.id()); } } + +throw new WindmillStreamShutdownException( Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830671651 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); + protected synchronized boolean hasReceivedShutdownSignal() { +return isShutdown; } - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; + /** Send a request to the server. */ + protected final synchronized void send(RequestT request) + throws StreamClosedException, WindmillStreamShutdownException { +debugMetrics.recordSend(); +requestObserver.onNext(request); } - /** Send a request to the server. */ - protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); + @Override + public final void start() { +boolean shouldStartStream = false; synchronized (this) { - if (streamClosed.get()) { -throw new IllegalStateException("Send called on a client closed stream."); + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; } +} - requestObserver().onNext(request); +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + debugMetrics.recordStart(); + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown while creating new stream.", e); Review Comment: // shutdown() is responsible for cleaning up pending requests. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -49,46 +44,52 @@ * stream if it is broken. Subclasses are responsible for retrying requests that have been lost on a * broken stream. * - * Subclasses should override onResponse to handle responses from the server, and onNewStream to - * perform any work that must be done when a new stream is created, such as sending headers or - * retrying requests. + * Subclasses should override {@link #onResponse(ResponseT)} to handle responses from the server, + * and {@link #onNewStream()} to perform any work that must be done when a new stream is created, + * such as sending headers or retrying requests. * - * send and startStream should not be called from onResponse; use executor() instead. + * {@link #send(RequestT)} and {@link #startStream()} should not be called from {@link + * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead. * * Synchronization on this is used to synchronize the gRpc stream state and internal data * structures. Since grpc channel operations may block, synchronization on this stream may also * block. This is generally not a problem since streams are used in a single-threaded manner. * However, some accessors used for status page and other debugging need to take care not to require * synchronizing on this. + * + * {@link #start()} and {@link #shutdown()} are called once in the lifetime of the stream. Once + * {@link #shutdown()}, a stream in considered invalid and cannot be restarted/reused. */ public abstract class AbstractWindmillStream implements WindmillStream { - public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300; // Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce // per-chunk overhead, and small enough that we can still perform granular flow-control. protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20; - private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class); - protected final AtomicBoolean clientClosed; - private final AtomicBoolean isShutdow
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830228776 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +/** + * Request observer that allows resetting its internal delegate using the given {@link + * #streamObserverFactory}. + * + * @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be + * {@link ThreadSafe}. + */ +@ThreadSafe +@Internal +final class ResettableStreamObserver implements StreamObserver { + private final Supplier> streamObserverFactory; + + @GuardedBy("this") + private @Nullable StreamObserver delegateStreamObserver; + + @GuardedBy("this") + private boolean isPoisoned; + + ResettableStreamObserver(Supplier> streamObserverFactory) { +this.streamObserverFactory = streamObserverFactory; +this.delegateStreamObserver = null; +this.isPoisoned = false; + } + + private synchronized StreamObserver delegate() { +if (isPoisoned) { + throw new WindmillStreamShutdownException("Explicit call to shutdown stream."); +} + +return Preconditions.checkNotNull( +delegateStreamObserver, +"requestObserver cannot be null. Missing a call to startStream() to initialize."); + } + + /** Creates a new delegate to use for future {@link StreamObserver} methods. */ + synchronized void reset() { +if (isPoisoned) { + throw new WindmillStreamShutdownException("Explicit call to shutdown stream."); +} + +delegateStreamObserver = streamObserverFactory.get(); + } + + /** + * Indicates that the request observer should no longer be used. Attempts to perform operations on + * the request observer will throw an {@link WindmillStreamShutdownException}. + */ + synchronized void poison() { +if (!isPoisoned) { + isPoisoned = true; + if (delegateStreamObserver != null) { +delegateStreamObserver.onError( +new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + } Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830060117 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -342,62 +391,88 @@ private void queueRequestAndWait(QueuedRequest request) throws InterruptedExcept batch.addRequest(request); } if (responsibleForSend) { - if (waitForSendLatch == null) { + if (prevBatch == null) { // If there was not a previous batch wait a little while to improve // batching. -Thread.sleep(1); +sleeper.sleep(1); } else { -waitForSendLatch.await(); +prevBatch.waitForSendOrFailNotification(); } // Finalize the batch so that no additional requests will be added. Leave the batch in the // queue so that a subsequent batch will wait for its completion. - synchronized (batches) { -verify(batch == batches.peekFirst()); + synchronized (shutdownLock) { +if (hasReceivedShutdownSignal()) { + throw shutdownException(batch); +} + +verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send()."); batch.markFinalized(); } - sendBatch(batch.requests()); - synchronized (batches) { -verify(batch == batches.pollFirst()); + trySendBatch(batch); +} else { + // Wait for this batch to be sent before parsing the response. + batch.waitForSendOrFailNotification(); +} + } + + void trySendBatch(QueuedBatch batch) { +try { + sendBatch(batch); + synchronized (shutdownLock) { +if (hasReceivedShutdownSignal()) { + throw shutdownException(batch); +} + +verify( +batch == batches.pollFirst(), +"Sent GetDataStream request batch removed before send() was complete."); } // Notify all waiters with requests in this batch as well as the sender // of the next batch (if one exists). - batch.countDown(); -} else { - // Wait for this batch to be sent before parsing the response. - batch.await(); + batch.notifySent(); +} catch (Exception e) { + // Free waiters if the send() failed. + batch.notifyFailed(); + // Propagate the exception to the calling thread. + throw e; } } - @SuppressWarnings("NullableProblems") - private void sendBatch(List requests) { -StreamingGetDataRequest batchedRequest = flushToBatch(requests); + private void sendBatch(QueuedBatch batch) { +if (batch.isEmpty()) { + return; +} + +// Synchronization of pending inserts is necessary with send to ensure duplicates are not +// sent on stream reconnect. synchronized (this) { - // Synchronization of pending inserts is necessary with send to ensure duplicates are not - // sent on stream reconnect. - for (QueuedRequest request : requests) { + synchronized (shutdownLock) { +// shutdown() clears pending, once the stream is shutdown, prevent values from being added Review Comment: whoops that was supposed to be in done w/in the lock done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830055450 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -49,22 +49,29 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse; import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ThreadSafe final class GrpcGetDataStream extends AbstractWindmillStream implements GetDataStream { private static final Logger LOG = LoggerFactory.getLogger(GrpcGetDataStream.class); + private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST = + StreamingGetDataRequest.newBuilder().build(); + /** @implNote {@link QueuedBatch} objects in the queue are is guarded by {@link #shutdownLock} */ private final Deque batches; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830054321 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -121,32 +129,43 @@ public static GrpcGetDataStream create( int streamingRpcBatchLimit, boolean sendKeyedGetDataRequests, Consumer> processHeartbeatResponses) { -GrpcGetDataStream getDataStream = -new GrpcGetDataStream( -backendWorkerToken, -startGetDataRpcFn, -backoff, -streamObserverFactory, -streamRegistry, -logEveryNStreamFailures, -getDataThrottleTimer, -jobHeader, -idGenerator, -streamingRpcBatchLimit, -sendKeyedGetDataRequests, -processHeartbeatResponses); -getDataStream.startStream(); -return getDataStream; +return new GrpcGetDataStream( +backendWorkerToken, +startGetDataRpcFn, +backoff, +streamObserverFactory, +streamRegistry, +logEveryNStreamFailures, +getDataThrottleTimer, +jobHeader, +idGenerator, +streamingRpcBatchLimit, +sendKeyedGetDataRequests, +processHeartbeatResponses); + } + + private static WindmillStreamShutdownException shutdownException(QueuedBatch batch) { +return new WindmillStreamShutdownException( +"Stream was closed when attempting to send " + batch.requestsCount() + " requests."); + } + + private static WindmillStreamShutdownException shutdownException(QueuedRequest request) { +return new WindmillStreamShutdownException( +"Cannot send request=[" + request + "] on closed stream."); } @Override protected synchronized void onNewStream() { +if (hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830053973 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ## @@ -121,32 +129,43 @@ public static GrpcGetDataStream create( int streamingRpcBatchLimit, boolean sendKeyedGetDataRequests, Consumer> processHeartbeatResponses) { -GrpcGetDataStream getDataStream = -new GrpcGetDataStream( -backendWorkerToken, -startGetDataRpcFn, -backoff, -streamObserverFactory, -streamRegistry, -logEveryNStreamFailures, -getDataThrottleTimer, -jobHeader, -idGenerator, -streamingRpcBatchLimit, -sendKeyedGetDataRequests, -processHeartbeatResponses); -getDataStream.startStream(); -return getDataStream; +return new GrpcGetDataStream( +backendWorkerToken, +startGetDataRpcFn, +backoff, +streamObserverFactory, +streamRegistry, +logEveryNStreamFailures, +getDataThrottleTimer, +jobHeader, +idGenerator, +streamingRpcBatchLimit, +sendKeyedGetDataRequests, +processHeartbeatResponses); + } + + private static WindmillStreamShutdownException shutdownException(QueuedBatch batch) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830052780 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ## @@ -201,7 +198,7 @@ private void maybeSendRequestExtension(GetWorkBudget extension) { @Override protected synchronized void onNewStream() { workItemAssemblers.clear(); -if (!isShutdown()) { +if (!hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830052130 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830042998 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); +debugMetrics.recordSend(); +requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { +if (hasReceivedShutdownSignal()) { + logger.debug("Stream was shutdown during send.", e); + return; +} + +requestObserver.onError(e); + } +} + } + + @Override + public final void start() { +boolean shouldStartStream = false; +synchronized (shutdownLock) { + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; + } +} + +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + if (hasReceivedShutdownSignal()) { +break; + } + debugMetrics.recordStart(); + streamClosed = false; + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown waiting to start.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", + getClass()); + break; +} catch (IOException ioe) { // Keep trying to create the stream. } } } + +// We were never able to start the stream, remove it from the stream registry. Otherwise, it is +// removed when closed. +streamRegistry.remove(this); } - protected final Executor executor() { -return executor; + /** + * Execute the runnable using the {@link #executor} handling the executor being in a shutdown + * state. + */ + protected final void executeSafely(Runnable runnable) { +try { + executor.execute(runnable); +} catch (RejectedExecutionException e) { + logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); +} } - public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) { -if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && !clientClosed.get()) { + public final void maybeSendHealthCheck(Instant lastSendThreshold) { +if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThresh
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830042375 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); +debugMetrics.recordSend(); +requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { +if (hasReceivedShutdownSignal()) { + logger.debug("Stream was shutdown during send.", e); + return; +} + +requestObserver.onError(e); + } +} + } + + @Override + public final void start() { +boolean shouldStartStream = false; +synchronized (shutdownLock) { + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; + } +} + +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + if (hasReceivedShutdownSignal()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1830041423 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1829911016 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); +debugMetrics.recordSend(); +requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { +if (hasReceivedShutdownSignal()) { + logger.debug("Stream was shutdown during send.", e); + return; +} + +requestObserver.onError(e); + } +} + } + + @Override + public final void start() { +boolean shouldStartStream = false; +synchronized (shutdownLock) { + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; + } +} + +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + if (hasReceivedShutdownSignal()) { +break; + } + debugMetrics.recordStart(); + streamClosed = false; + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown waiting to start.", e); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
scwhittle commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1829093210 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); Review Comment: I would remove this since it seems likely expensive. Instead you could verify with a test: - setup requestObserver that blocks until notified - one thread calls send and starts blocking - main test thread calls shutdown() and verifies the method returns - main test thread unblocks the requestObserver ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + protected boolean hasReceivedShutdownSignal() { +synchronized (shutdownLock) { + return isShutdown; } - -return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (hasReceivedShutdownSignal()) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); +debugMetrics.recordSend(); +requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { +if (hasReceivedShutdownSignal()) { + logger.debug("Stream was shutdown during send.", e); + return; +} + +requestObserver.onError(e); + } +} + } + + @Override + public final void start() { +boolean shouldStartStream = false; +synchronized (shutdownLock) { + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; + } +} + +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + if (hasReceivedShutdownSignal()) { Review Comment: this seems racy since shutdown coudl happen right after, the requestObserver poisoning already handles it below. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long star
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on PR #32774: URL: https://github.com/apache/beam/pull/32774#issuecomment-2451099981 back to you @scwhittle thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1824900354 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import com.google.auto.value.AutoValue; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.joda.time.DateTime; +import org.joda.time.Instant; + +/** Records stream metrics for debugging. */ +@ThreadSafe +final class StreamDebugMetrics { + private final AtomicInteger restartCount = new AtomicInteger(); + private final AtomicInteger errorCount = new AtomicInteger(); + + @GuardedBy("this") + private long sleepUntil = 0; + + @GuardedBy("this") + private String lastRestartReason = ""; + + @GuardedBy("this") + private DateTime lastRestartTime = null; + + @GuardedBy("this") + private long lastResponseTimeMs = 0; + + @GuardedBy("this") + private long lastSendTimeMs = 0; + + @GuardedBy("this") + private long startTimeMs = 0; + + @GuardedBy("this") + private DateTime shutdownTime = null; + + private static long debugDuration(long nowMs, long startMs) { +return startMs <= 0 ? -1 : Math.max(0, nowMs - startMs); + } + + private static long nowMs() { +return Instant.now().getMillis(); + } + + synchronized void recordSend() { +lastSendTimeMs = nowMs(); + } + + synchronized void recordStart() { +startTimeMs = nowMs(); +lastResponseTimeMs = 0; + } + + synchronized void recordResponse() { +lastResponseTimeMs = nowMs(); + } + + synchronized void recordRestartReason(String error) { +lastRestartReason = error; +lastRestartTime = DateTime.now(); + } + + synchronized long startTimeMs() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1824930724 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ## @@ -204,56 +224,64 @@ private void flushInternal(Map requests) { } } - private void issueSingleRequest(final long id, PendingRequest pendingRequest) { + private void issueSingleRequest(long id, PendingRequest pendingRequest) { +if (!prepareForSend(id, pendingRequest)) { + pendingRequest.abort(); + return; +} + StreamingCommitWorkRequest.Builder requestBuilder = StreamingCommitWorkRequest.newBuilder(); requestBuilder .addCommitChunkBuilder() -.setComputationId(pendingRequest.computation) +.setComputationId(pendingRequest.computationId()) .setRequestId(id) -.setShardingKey(pendingRequest.request.getShardingKey()) -.setSerializedWorkItemCommit(pendingRequest.request.toByteString()); +.setShardingKey(pendingRequest.shardingKey()) +.setSerializedWorkItemCommit(pendingRequest.serializedCommit()); StreamingCommitWorkRequest chunk = requestBuilder.build(); -synchronized (this) { - pending.put(id, pendingRequest); - try { -send(chunk); - } catch (IllegalStateException e) { -// Stream was broken, request will be retried when stream is reopened. - } +try { + send(chunk); +} catch (IllegalStateException e) { + // Stream was broken, request will be retried when stream is reopened. } } private void issueBatchedRequest(Map requests) { +if (!prepareForSend(requests)) { + requests.forEach((ignored, pendingRequest) -> pendingRequest.abort()); + return; +} + StreamingCommitWorkRequest.Builder requestBuilder = StreamingCommitWorkRequest.newBuilder(); String lastComputation = null; for (Map.Entry entry : requests.entrySet()) { PendingRequest request = entry.getValue(); StreamingCommitRequestChunk.Builder chunkBuilder = requestBuilder.addCommitChunkBuilder(); - if (lastComputation == null || !lastComputation.equals(request.computation)) { -chunkBuilder.setComputationId(request.computation); -lastComputation = request.computation; + if (lastComputation == null || !lastComputation.equals(request.computationId())) { +chunkBuilder.setComputationId(request.computationId()); +lastComputation = request.computationId(); } - chunkBuilder.setRequestId(entry.getKey()); - chunkBuilder.setShardingKey(request.request.getShardingKey()); - chunkBuilder.setSerializedWorkItemCommit(request.request.toByteString()); + chunkBuilder + .setRequestId(entry.getKey()) + .setShardingKey(request.shardingKey()) + .setSerializedWorkItemCommit(request.serializedCommit()); } StreamingCommitWorkRequest request = requestBuilder.build(); -synchronized (this) { - pending.putAll(requests); - try { -send(request); - } catch (IllegalStateException e) { -// Stream was broken, request will be retried when stream is reopened. - } +try { + send(request); +} catch (IllegalStateException e) { + // Stream was broken, request will be retried when stream is reopened. } } - private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) { -checkNotNull(pendingRequest.computation); -final ByteString serializedCommit = pendingRequest.request.toByteString(); + private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) { +if (!prepareForSend(id, pendingRequest)) { + pendingRequest.abort(); + return; +} +checkNotNull(pendingRequest.computationId(), "Cannot commit WorkItem w/o a computationId."); +ByteString serializedCommit = pendingRequest.serializedCommit(); synchronized (this) { - pending.put(id, pendingRequest); for (int i = 0; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1824914980 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long startMs) { /** Reflects that {@link #shutdown()} was explicitly called. */ protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; +return isShutdown; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (isShutdown) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +debugMetrics.recordSend(); +requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { +if (isShutdown) { + logger.debug("Stream was shutdown during send.", e); + return; +} + +requestObserver.onError(e); + } +} + } + + @Override + public final void start() { +boolean shouldStartStream = false; +synchronized (shutdownLock) { + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; + } +} + +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + if (isShutdown) { +break; + } + debugMetrics.recordStart(); + streamClosed = false; + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown waiting to start.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", + getClass()); + break; +} catch (IOException ioe) { // Keep trying to create the stream. } } } + +// We were never able to start the stream, remove it from the stream registry. Otherwise, it is +// removed when closed. +streamRegistry.remove(this); } - protected final Executor executor() { -return executor; + /** + * Execute the runnable using the {@link #executor} handling the executor being in a shutdown + * state. + */ + protected final void executeSafely(Runnable runnable) { +try { + executor.execute(runnable); +} catch (RejectedExecutionException e) { + logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); +} } - public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) { -if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && !clientClosed.get()) { + public final void maybeSendHealthCheck(Instant lastSendThreshold) { +if (!clientClosed && debugMetrics.lastSendTimeMs() < lastSendThreshold.getMillis()) { try { sendHealthCheck(); } catch (RuntimeException e) { -LOG.debug("Received exception sending health check.", e); +logger.debug("Received exception sending health check.", e); } } } protected abstract void sendH
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1824906796 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -49,46 +44,64 @@ * stream if it is broken. Subclasses are responsible for retrying requests that have been lost on a * broken stream. * - * Subclasses should override onResponse to handle responses from the server, and onNewStream to - * perform any work that must be done when a new stream is created, such as sending headers or - * retrying requests. + * Subclasses should override {@link #onResponse(ResponseT)} to handle responses from the server, + * and {@link #onNewStream()} to perform any work that must be done when a new stream is created, + * such as sending headers or retrying requests. * - * send and startStream should not be called from onResponse; use executor() instead. + * {@link #send(RequestT)} and {@link #startStream()} should not be called from {@link + * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead. * * Synchronization on this is used to synchronize the gRpc stream state and internal data * structures. Since grpc channel operations may block, synchronization on this stream may also * block. This is generally not a problem since streams are used in a single-threaded manner. * However, some accessors used for status page and other debugging need to take care not to require * synchronizing on this. + * + * {@link #start()} and {@link #shutdown()} are called once in the lifetime of the stream. Once + * {@link #shutdown()}, a stream in considered invalid and cannot be restarted/reused. */ public abstract class AbstractWindmillStream implements WindmillStream { - public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300; // Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce // per-chunk overhead, and small enough that we can still perform granular flow-control. protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20; - private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class); - protected final AtomicBoolean clientClosed; - private final AtomicBoolean isShutdown; - private final AtomicLong lastSendTimeMs; - private final Executor executor; + // Indicates that the logical stream has been half-closed and is waiting for clean server + // shutdown. + private static final Status OK_STATUS = Status.fromCode(Status.Code.OK); + protected final Sleeper sleeper; + + /** + * Used to guard {@link #start()} and {@link #shutdown()} behavior. + * + * @implNote Do not hold when performing IO. If also locking on {@code this} in the same context, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1824913419 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long startMs) { /** Reflects that {@link #shutdown()} was explicitly called. */ protected boolean isShutdown() { -return isShutdown.get(); - } - - private StreamObserver requestObserver() { -if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); -} - -return requestObserver; +return isShutdown; } /** Send a request to the server. */ protected final void send(RequestT request) { -lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (streamClosed.get()) { + if (isShutdown) { +return; + } + + if (streamClosed) { +// TODO(m-trieu): throw a more specific exception here (i.e StreamClosedException) throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { +debugMetrics.recordSend(); +requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { +if (isShutdown) { + logger.debug("Stream was shutdown during send.", e); + return; +} + +requestObserver.onError(e); + } +} + } + + @Override + public final void start() { +boolean shouldStartStream = false; +synchronized (shutdownLock) { + if (!isShutdown && !started) { +started = true; +shouldStartStream = true; + } +} + +if (shouldStartStream) { + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { - startTimeMs.set(Instant.now().getMillis()); - lastResponseTimeMs.set(0); - streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + if (isShutdown) { +break; + } + debugMetrics.recordStart(); + streamClosed = false; + requestObserver.reset(); onNewStream(); - if (clientClosed.get()) { + if (clientClosed) { halfClose(); } return; } + } catch (WindmillStreamShutdownException e) { +logger.debug("Stream was shutdown waiting to start.", e); } catch (Exception e) { -LOG.error("Failed to create new stream, retrying: ", e); +logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); -} catch (InterruptedException | IOException i) { + debugMetrics.recordSleep(sleep); + sleeper.sleep(sleep); +} catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during {} creation backoff. The stream will not be created.", + getClass()); + break; +} catch (IOException ioe) { // Keep trying to create the stream. } } } + +// We were never able to start the stream, remove it from the stream registry. Otherwise, it is +// removed when closed. +streamRegistry.remove(this); } - protected final Executor executor() { -return executor; + /** + * Execute the runnable using the {@link #executor} handling the executor being in a shutdown + * state. + */ + protected final void executeSafely(Runnable runnable) { +try { + executor.execute(runnable); +} catch (RejectedExecutionException e) { + logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); +} } - public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) { -if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && !clientClosed.get()) { + public final void maybeSendHealthCheck(Instant lastSendThreshold) { +if (!clientClosed && debugMetrics.lastSendTimeMs() < lastSendThreshold.getMillis()) { try { sendHealthCheck(); } catch (RuntimeException e) { -LOG.debug("Received exception sending health check.", e); +logger.debug("Received exception sending health check.", e); } } } protected abstract void sendH
Re: [PR] add shutdown and start mechanics to windmill streams [beam]
m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1824910797 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ## @@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long startMs) { /** Reflects that {@link #shutdown()} was explicitly called. */ protected boolean isShutdown() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org