Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-19 Thread via GitHub


scwhittle merged PR #32774:
URL: https://github.com/apache/beam/pull/32774


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844393236


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -162,24 +170,23 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
 isReadyNotifier.forceTermination();
 synchronized (lock) {
-  markClosedOrThrow();
-  outboundObserver.onError(t);
+  if (!isClosed) {

Review Comment:
   done
   also caught the illegal state exception in `AbstractWindmillStream#halfClose`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844329618


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##
@@ -132,6 +132,10 @@ private static Optional 
tryParseDirectEndpointIntoIpV6Address(
 directEndpointAddress.getHostAddress(), (int) 
endpointProto.getPort()));
   }
 
+  public final boolean isEmpty() {
+return equals(none());

Review Comment:
   removed but made none a singleton



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844356091


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##
@@ -299,24 +303,39 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
   }
 
   /** Close the streams that are no longer valid asynchronously. */
-  private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) {
+  @CanIgnoreReturnValue
+  private ImmutableList> closeStreamsNotIn(
+  WindmillEndpoints newWindmillEndpoints) {
 StreamingEngineBackends currentBackends = backends.get();
-currentBackends.windmillStreams().entrySet().stream()
-.filter(
-connectionAndStream ->
-
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
-.forEach(
-entry ->
-windmillStreamManager.execute(
-() -> closeStreamSender(entry.getKey(), 
entry.getValue(;
+List> closeStreamFutures =
+currentBackends.windmillStreams().entrySet().stream()
+.filter(
+connectionAndStream ->
+!newWindmillEndpoints
+.windmillEndpoints()
+.contains(connectionAndStream.getKey()))
+.map(
+entry ->
+CompletableFuture.runAsync(
+() -> closeStreamSender(entry.getKey(), 
entry.getValue()),
+windmillStreamManager))
+.collect(Collectors.toList());
 
 Set newGlobalDataEndpoints =
 new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
-currentBackends.globalDataStreams().values().stream()
-.filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
-.forEach(
-sender ->
-windmillStreamManager.execute(() -> 
closeStreamSender(sender.endpoint(), sender)));
+List> closeGlobalDataStreamFutures =
+currentBackends.globalDataStreams().values().stream()
+.filter(sender -> 
!newGlobalDataEndpoints.contains(sender.endpoint()))
+.map(
+sender ->
+CompletableFuture.runAsync(
+() -> closeStreamSender(sender.endpoint(), sender), 
windmillStreamManager))
+.collect(Collectors.toList());
+
+return ImmutableList.>builder()

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844328828


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -104,6 +104,10 @@ synchronized void poison() {
 }
   }
 
+  synchronized boolean hasReceivedPoisonPill() {

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844337008


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -89,6 +90,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
 throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
   }
 
+  // We close under "lock", so this should never happen.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844336841


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
 // If the delegate above was already terminated via onError or 
onComplete from another
 // thread.
 logger.warn("StreamObserver was previously cancelled.", e);
+  } catch (RuntimeException ignored) {
+logger.warn("StreamObserver was unexpectedly cancelled.", e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844328452


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+
+@RunWith(JUnit4.class)
+public class GrpcCommitWorkStreamTest {
+  private static final String FAKE_SERVER_NAME = "Fake server for 
GrpcCommitWorkStreamTest";
+  private static final Windmill.JobHeader TEST_JOB_HEADER =
+  Windmill.JobHeader.newBuilder()
+  .setJobId("test_job")
+  .setWorkerId("test_worker")
+  .setProjectId("test_project")
+  .build();
+  private static final String COMPUTATION_ID = "computationId";
+
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  private ManagedChannel inProcessChannel;
+
+  private static Windmill.WorkItemCommitRequest workItemCommitRequest(long 
value) {
+return Windmill.WorkItemCommitRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setShardingKey(value)
+.setWorkToken(value)
+.setCacheToken(value)
+.build();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+Server server =
+InProcessServerBuilder.forName(FAKE_SERVER_NAME)
+.fallbackHandlerRegistry(serviceRegistry)
+.directExecutor()
+.build()
+.start();
+
+inProcessChannel =
+grpcCleanup.register(
+
InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build());
+grpcCleanup.register(server);
+grpcCleanup.register(inProcessChannel);
+  }
+
+  @After
+  public void cleanUp() {
+inProcessChannel.shutdownNow();
+  }
+
+  private GrpcCommitWorkStream createCommitWorkStream(CommitWorkStreamTestStub 
testStub) {
+serviceRegistry.addService(testStub);
+GrpcCommitWorkStream commitWorkStream =
+(GrpcCommitWorkStream)
+GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+.build()
+.createCommitWorkStream(
+Clo

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844326533


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit 
unit) throws Interrupte
 
   @Override
   public final Instant startTime() {
-return new Instant(startTimeMs.get());
+return new Instant(debugMetrics.getStartTimeMs());
   }
 
   @Override
   public String backendWorkerToken() {
 return backendWorkerToken;
   }
 
+  @SuppressWarnings("GuardedBy")
   @Override
-  public void shutdown() {
-if (isShutdown.compareAndSet(false, true)) {
-  requestObserver()
-  .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+  public final void shutdown() {
+// Don't lock on "this" before poisoning the request observer since 
otherwise the observer may
+// be blocking in send().
+requestObserver.poison();
+synchronized (this) {
+  if (!isShutdown) {
+isShutdown = true;
+debugMetrics.recordShutdown();
+shutdownInternal();
+  }
 }
   }
 
-  private void setLastError(String error) {
-lastError.set(error);
-lastErrorTime.set(DateTime.now());
-  }
-
-  public static class WindmillStreamShutdownException extends RuntimeException 
{
-public WindmillStreamShutdownException(String message) {
-  super(message);
+  protected abstract void shutdownInternal();
+
+  /** Returns true if the stream was torn down and should not be restarted 
internally. */
+  private synchronized boolean maybeTearDownStream() {
+if (requestObserver.hasReceivedPoisonPill()

Review Comment:
   done, realized we weren't calling `break` on the 
`WindmillStreamShutdownException` in `startStream()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1844325871


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit 
unit) throws Interrupte
 
   @Override
   public final Instant startTime() {
-return new Instant(startTimeMs.get());
+return new Instant(debugMetrics.getStartTimeMs());
   }
 
   @Override
   public String backendWorkerToken() {
 return backendWorkerToken;
   }
 
+  @SuppressWarnings("GuardedBy")

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1843484534


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit 
unit) throws Interrupte
 
   @Override
   public final Instant startTime() {
-return new Instant(startTimeMs.get());
+return new Instant(debugMetrics.getStartTimeMs());
   }
 
   @Override
   public String backendWorkerToken() {
 return backendWorkerToken;
   }
 
+  @SuppressWarnings("GuardedBy")
   @Override
-  public void shutdown() {
-if (isShutdown.compareAndSet(false, true)) {
-  requestObserver()
-  .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+  public final void shutdown() {
+// Don't lock on "this" before poisoning the request observer since 
otherwise the observer may
+// be blocking in send().
+requestObserver.poison();
+synchronized (this) {
+  if (!isShutdown) {
+isShutdown = true;
+debugMetrics.recordShutdown();
+shutdownInternal();
+  }
 }
   }
 
-  private void setLastError(String error) {
-lastError.set(error);
-lastErrorTime.set(DateTime.now());
-  }
-
-  public static class WindmillStreamShutdownException extends RuntimeException 
{
-public WindmillStreamShutdownException(String message) {
-  super(message);
+  protected abstract void shutdownInternal();
+
+  /** Returns true if the stream was torn down and should not be restarted 
internally. */
+  private synchronized boolean maybeTearDownStream() {
+if (requestObserver.hasReceivedPoisonPill()

Review Comment:
   what is this receivedPoisonPill check guarding against?
   It is racy because we poison outside of the synchronized block so we could 
have
   
   T1: notices unrelated stream failure, passes this check and isn't poisoned, 
starts calling onNewStream
   T2: calls shutdown, poisons request observer
   T1: calls requestObserver.reset() gets exception due to poison.
   
   Instead of checking the poison here, it seems like we should just handle the 
exception due to reset failing as that covers both cases.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -269,31 +312,43 @@ public final boolean awaitTermination(int time, TimeUnit 
unit) throws Interrupte
 
   @Override
   public final Instant startTime() {
-return new Instant(startTimeMs.get());
+return new Instant(debugMetrics.getStartTimeMs());
   }
 
   @Override
   public String backendWorkerToken() {
 return backendWorkerToken;
   }
 
+  @SuppressWarnings("GuardedBy")

Review Comment:
   remove suppression



##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-15 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1843416690


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##
@@ -132,6 +132,10 @@ private static Optional 
tryParseDirectEndpointIntoIpV6Address(
 directEndpointAddress.getHostAddress(), (int) 
endpointProto.getPort()));
   }
 
+  public final boolean isEmpty() {
+return equals(none());

Review Comment:
   How about changing none() to return some singleton instead of building every 
time if we might be calling empty a lot.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -104,6 +104,10 @@ synchronized void poison() {
 }
   }
 
+  synchronized boolean hasReceivedPoisonPill() {

Review Comment:
   nit: how about isPoisoned?
   
   Poison pill was a special element we added to queues etc, it's a little 
confusing to use the term here to me.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -162,24 +170,23 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
 isReadyNotifier.forceTermination();
 synchronized (lock) {
-  markClosedOrThrow();
-  outboundObserver.onError(t);
+  if (!isClosed) {

Review Comment:
   I think we should have
   isUserClosed = onError or onCompleted called on this object
   isClosed = onError or onCompleted called on outboundObserver
   Could name isClosed and isOutboundClosed or something if clearer. Woudl be 
good to comment above too.
   
   I think that onError/onCompleted should be like:
   
   check(!isUserClosed);
   isUserClosed = true;
   if (!isClosed) {
 outboundObserver.onError(t);
 isClosed = true; 
   }
   
   enforcing that only one of them is called. And terminate should be like
   
   if (!isClosed) {
 outboundObserver.onError(t);
 isClosed = true;
   }



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
 // If the delegate above was already terminated via onError or 
onComplete from another
 // thread.
 logger.warn("StreamObserver was previously cancelled.", e);
+  } catch (RuntimeException ignored) {
+logger.warn("StreamObserver was unexpectedly cancelled.", e);

Review Comment:
   I was thinking above you should log both e and the currently ignored 
exception
   
   ditto here. Also this log message is a little confusing how about
   "encountered error {} when cancelling due to error"



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##
@@ -299,24 +303,39 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
   }
 
   /** Close the streams that are no longer valid asynchronously. */
-  private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) {
+  @CanIgnoreReturnValue
+  private ImmutableList> closeStreamsNotIn(
+  WindmillEndpoints newWindmillEndpoints) {
 StreamingEngineBackends currentBackends = backends.get();
-currentBackends.windmillStreams().entrySet().stream()
-.filter(
-connectionAndStream ->
-
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
-.forEach(
-entry ->
-windmillStreamManager.execute(
-() -> closeStreamSender(entry.getKey(), 
entry.getValue(;
+List> closeStreamFutures =
+currentBackends.windmillStreams().entrySet().stream()
+.filter(
+connectionAndStream ->
+!newWindmillEndpoints
+.windmillEndpoints()
+.contains(connectionAndStream.getKey()))
+.map(
+entry ->
+CompletableFuture.runAsync(
+() -> closeStreamSender(entry.getKey(), 
entry.getValue()),
+windmillStreamManager))
+.collect(Collectors.toList());
 
 Set newGlobalDataEndpoints =
 new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
-currentBackends.globalDataStreams().values().stream()
-.filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
-.forEach(
-sender ->
-windmillStreamManager.execute(() -> 
closeStreamSender(sender.endpoint(), sender)));
+Lis

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-14 Thread via GitHub


m-trieu commented on PR #32774:
URL: https://github.com/apache/beam/pull/32774#issuecomment-2478169104

   back to you @scwhittle thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841150008


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserverTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
+import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.VerifyException;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DirectStreamObserverTest {
+
+  @Test

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841042910


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -83,16 +85,14 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
   // are synchronized after terminating the phaser if we observe that 
the phaser is not
   // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
-  if (currentPhase < 0) return;
+  if (currentPhase < 0) {
+throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+  }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841039667


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -128,7 +128,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
   // are synchronized after terminating the phaser if we observe that 
the phaser is not
   // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
-  if (currentPhase < 0) return;
+  if (currentPhase < 0) {
+throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+  }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841038480


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -128,7 +128,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
   // are synchronized after terminating the phaser if we observe that 
the phaser is not
   // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
-  if (currentPhase < 0) return;
+  if (currentPhase < 0) {
+throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+  }
   messagesSinceReady = 0;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841025802


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -161,19 +162,27 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
 isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onError(t);
 }
   }
 
   @Override
   public void onCompleted() {
+isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onCompleted();
 }
   }
 
+  private void markClosedOrThrow() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841019493


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java:
##
@@ -102,6 +105,48 @@ private GrpcGetDataStream 
createGetDataStream(GetDataStreamTestStub testStub) {
 return getDataStream;
   }
 
+  @Test
+  public void testRequestKeyedData() {
+GetDataStreamTestStub testStub =
+new GetDataStreamTestStub(new TestGetDataStreamRequestObserver());
+GrpcGetDataStream getDataStream = createGetDataStream(testStub);
+// These will block until they are successfully sent.
+CompletableFuture sendFuture =
+CompletableFuture.supplyAsync(
+() -> {
+  try {
+return getDataStream.requestKeyedData(
+"computationId",
+Windmill.KeyedGetDataRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setShardingKey(1)
+.setCacheToken(1)
+.setWorkToken(1)
+.build());
+  } catch (WindmillStreamShutdownException e) {
+throw new RuntimeException(e);
+  }
+});
+
+// Sleep a bit to allow future to run.
+Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841019232


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java:
##
@@ -142,7 +144,8 @@ public void 
testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn
 assertThrows(
 WindmillStreamShutdownException.class,
 queuedBatch::waitForSendOrFailNotification));
-
+// Wait a few seconds for the above future to get scheduled and run.
+Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841018880


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##
@@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws 
InterruptedException {
   commitProcessed.countDown();
 });
   }
+} catch (StreamObserverCancelledException ignored) {
 }
 
 // Verify that we sent the commits above in a request + the initial header.
-verify(requestObserver, 
times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class));
+verify(requestObserver, times(2))
+.onNext(
+argThat(
+request -> {
+  if (request.getHeader().equals(TEST_JOB_HEADER)) {

Review Comment:
   done used inorder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841011999


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -161,19 +162,27 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
 isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onError(t);
 }
   }
 
   @Override
   public void onCompleted() {
+isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onCompleted();
 }
   }
 
+  private void markClosedOrThrow() {
+synchronized (lock) {
+  Preconditions.checkState(!isClosed);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841012413


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##
@@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {}
 () ->
 assertThrows(WindmillStreamShutdownException.class, () -> 
testStream.testSend(1)));
 testStream.shutdown();
+
+// Sleep a bit to give sendExecutor time to execute the send().
+Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1841002762


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) {
 
 for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
   AppendableInputStream responseStream = 
pending.get(chunk.getRequestId(i));
-  verify(responseStream != null, "No pending response stream");
+  synchronized (this) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1840996524


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
 
   try {
 delegate.onError(e);
-  } catch (RuntimeException ignored) {
+  } catch (IllegalStateException ignored) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1840157947


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
 
   try {
 delegate.onError(e);
-  } catch (RuntimeException ignored) {
+  } catch (IllegalStateException ignored) {

Review Comment:
   add the ignored to the exception too? just in case we catch something 
unexpected and it helps debugging?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -161,19 +162,27 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
 isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onError(t);
 }
   }
 
   @Override
   public void onCompleted() {
+isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onCompleted();
 }
   }
 
+  private void markClosedOrThrow() {
+synchronized (lock) {
+  Preconditions.checkState(!isClosed);

Review Comment:
   this is going to throw if we have sequence
   
   T1: onNext()
   T2: terminate()
   T1: onError()
   
   It seems like that could happen if we're terminating from other threads than 
the one driving the observer generally.  We could have a separate bool tracking 
if userClosed or not, and change this exception to be based upon that as that 
is using the class wrong. having a terminate before/during a 
onCompleted/onError doesn't necessarily seem like misuse and I think we should 
avoid throwing an exception.
   
   
   



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -161,19 +162,27 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
 isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onError(t);
 }
   }
 
   @Override
   public void onCompleted() {
+isReadyNotifier.forceTermination();
 synchronized (lock) {
-  isClosed = true;
+  markClosedOrThrow();
   outboundObserver.onCompleted();
 }
   }
 
+  private void markClosedOrThrow() {

Review Comment:
   just make method synchronized (though with suggestion below, probably easier 
to just duplicate in onCompleted/onError and use the same if block.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) {
 
 for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
   AppendableInputStream responseStream = 
pending.get(chunk.getRequestId(i));
-  verify(responseStream != null, "No pending response stream");
+  synchronized (this) {

Review Comment:
   avoid synchronizing if not necessary and you need to handle the null case or 
you will just get an exception in following line.
   
   if (responseStream == null) {
 sychronized (this) { verify(isShutdown); }
 continue;
   }



##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##
@@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws 
InterruptedException {
   commitProcessed.countDown();
 });
   }
+} catch (StreamObserverCancelledException ignored) {
 }
 
 // Verify that we sent the commits above in a request + the initial header.
-verify(requestObserver, 
times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class));
+verify(requestObserver, times(2))
+.onNext(
+argThat(
+request -> {
+  if (request.getHeader().equals(TEST_JOB_HEADER)) {

Review Comment:
   nit: how about separate onNext? as is 2 headers would pass. may need some 
sequence object, not sure.
   
   .onNext(argThat(r -> return r.getHeader().equals(TEST_JOB_HEADER))
   .onNext(argThat(r - > !return request.getCommitChunkList().empty())



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -128,7 +128,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   // Phaser is terminated so don't us

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839829444


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+
+@RunWith(JUnit4.class)
+public class GrpcCommitWorkStreamTest {
+  private static final String FAKE_SERVER_NAME = "Fake server for 
GrpcCommitWorkStreamTest";
+  private static final Windmill.JobHeader TEST_JOB_HEADER =
+  Windmill.JobHeader.newBuilder()
+  .setJobId("test_job")
+  .setWorkerId("test_worker")
+  .setProjectId("test_project")
+  .build();
+  private static final String COMPUTATION_ID = "computationId";
+
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  private ManagedChannel inProcessChannel;
+
+  private static Windmill.WorkItemCommitRequest workItemCommitRequest(long 
value) {
+return Windmill.WorkItemCommitRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setShardingKey(value)
+.setWorkToken(value)
+.setCacheToken(value)
+.build();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+Server server =
+InProcessServerBuilder.forName(FAKE_SERVER_NAME)
+.fallbackHandlerRegistry(serviceRegistry)
+.directExecutor()
+.build()
+.start();
+
+inProcessChannel =
+grpcCleanup.register(
+
InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build());
+grpcCleanup.register(server);
+grpcCleanup.register(inProcessChannel);
+  }
+
+  @After
+  public void cleanUp() {
+inProcessChannel.shutdownNow();
+  }
+
+  private GrpcCommitWorkStream createCommitWorkStream(CommitWorkStreamTestStub 
testStub) {
+serviceRegistry.addService(testStub);
+GrpcCommitWorkStream commitWorkStream =
+(GrpcCommitWorkStream)
+GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+.build()
+.createCommitWorkStream(
+CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel),
+new 

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839908734


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -342,62 +401,86 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
   batch.addRequest(request);
 }
 if (responsibleForSend) {
-  if (waitForSendLatch == null) {
+  if (prevBatch == null) {
 // If there was not a previous batch wait a little while to improve
 // batching.
-Thread.sleep(1);
+sleeper.sleep(1);
   } else {
-waitForSendLatch.await();
+prevBatch.waitForSendOrFailNotification();
   }
   // Finalize the batch so that no additional requests will be added.  
Leave the batch in the
   // queue so that a subsequent batch will wait for its completion.
-  synchronized (batches) {
-verify(batch == batches.peekFirst());
+  synchronized (this) {
+if (isShutdown) {
+  throw shutdownExceptionFor(batch);
+}
+
+verify(batch == batches.peekFirst(), "GetDataStream request batch 
removed before send().");
 batch.markFinalized();
   }
-  sendBatch(batch.requests());
-  synchronized (batches) {
-verify(batch == batches.pollFirst());
+  trySendBatch(batch);
+} else {
+  // Wait for this batch to be sent before parsing the response.
+  batch.waitForSendOrFailNotification();
+}
+  }
+
+  void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException {
+try {
+  sendBatch(batch);
+  synchronized (this) {
+if (isShutdown) {
+  throw shutdownExceptionFor(batch);
+}
+
+verify(
+batch == batches.pollFirst(),
+"Sent GetDataStream request batch removed before send() was 
complete.");
   }
   // Notify all waiters with requests in this batch as well as the sender
   // of the next batch (if one exists).
-  batch.countDown();
-} else {
-  // Wait for this batch to be sent before parsing the response.
-  batch.await();
+  batch.notifySent();
+} catch (Exception e) {
+  // Free waiters if the send() failed.
+  batch.notifyFailed();
+  // Propagate the exception to the calling thread.
+  throw e;
 }
   }
 
-  @SuppressWarnings("NullableProblems")
-  private void sendBatch(List requests) {
-StreamingGetDataRequest batchedRequest = flushToBatch(requests);
+  private void sendBatch(QueuedBatch batch) throws 
WindmillStreamShutdownException {
+if (batch.isEmpty()) {
+  return;
+}
+
+// Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
+// sent on stream reconnect.
 synchronized (this) {
-  // Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
-  // sent on stream reconnect.
-  for (QueuedRequest request : requests) {
+  if (isShutdown) {
+throw shutdownExceptionFor(batch);
+  }
+
+  for (QueuedRequest request : batch.requestsReadOnly()) {
 // Map#put returns null if there was no previous mapping for the key, 
meaning we have not
 // seen it before.
-verify(pending.put(request.id(), request.getResponseStream()) == null);
+verify(
+pending.put(request.id(), request.getResponseStream()) == null,
+"Request already sent.");
   }
-  try {
-send(batchedRequest);
-  } catch (IllegalStateException e) {
+
+  if (!trySend(batch.asGetDataRequest())) {
 // The stream broke before this call went through; onNewStream will 
retry the fetch.
-LOG.warn("GetData stream broke before call started.", e);
+LOG.warn("GetData stream broke before call started.");
   }
 }
   }
 
-  @SuppressWarnings("argument")
-  private StreamingGetDataRequest flushToBatch(List requests) {
-// Put all global data requests first because there is only a single 
repeated field for
-// request ids and the initial ids correspond to global data requests if 
they are present.
-requests.sort(QueuedRequest.globalRequestsFirst());
-StreamingGetDataRequest.Builder builder = 
StreamingGetDataRequest.newBuilder();
-for (QueuedRequest request : requests) {
-  request.addToStreamingGetDataRequest(builder);
-}
-return builder.build();
+  private synchronized void verify(boolean condition, String message) {
+Verify.verify(condition || isShutdown, message);
+  }
+
+  private synchronized boolean isShutdownLocked() {

Review Comment:
   dpne



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this servic

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839912127


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -342,62 +401,86 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
   batch.addRequest(request);
 }
 if (responsibleForSend) {
-  if (waitForSendLatch == null) {
+  if (prevBatch == null) {
 // If there was not a previous batch wait a little while to improve
 // batching.
-Thread.sleep(1);
+sleeper.sleep(1);
   } else {
-waitForSendLatch.await();
+prevBatch.waitForSendOrFailNotification();
   }
   // Finalize the batch so that no additional requests will be added.  
Leave the batch in the
   // queue so that a subsequent batch will wait for its completion.
-  synchronized (batches) {
-verify(batch == batches.peekFirst());
+  synchronized (this) {
+if (isShutdown) {
+  throw shutdownExceptionFor(batch);
+}
+
+verify(batch == batches.peekFirst(), "GetDataStream request batch 
removed before send().");
 batch.markFinalized();
   }
-  sendBatch(batch.requests());
-  synchronized (batches) {
-verify(batch == batches.pollFirst());
+  trySendBatch(batch);
+} else {
+  // Wait for this batch to be sent before parsing the response.
+  batch.waitForSendOrFailNotification();
+}
+  }
+
+  void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException {
+try {
+  sendBatch(batch);
+  synchronized (this) {
+if (isShutdown) {
+  throw shutdownExceptionFor(batch);
+}
+
+verify(
+batch == batches.pollFirst(),
+"Sent GetDataStream request batch removed before send() was 
complete.");
   }
   // Notify all waiters with requests in this batch as well as the sender
   // of the next batch (if one exists).
-  batch.countDown();
-} else {
-  // Wait for this batch to be sent before parsing the response.
-  batch.await();
+  batch.notifySent();
+} catch (Exception e) {
+  // Free waiters if the send() failed.
+  batch.notifyFailed();
+  // Propagate the exception to the calling thread.
+  throw e;
 }
   }
 
-  @SuppressWarnings("NullableProblems")
-  private void sendBatch(List requests) {
-StreamingGetDataRequest batchedRequest = flushToBatch(requests);
+  private void sendBatch(QueuedBatch batch) throws 
WindmillStreamShutdownException {
+if (batch.isEmpty()) {
+  return;
+}
+
+// Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
+// sent on stream reconnect.
 synchronized (this) {
-  // Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
-  // sent on stream reconnect.
-  for (QueuedRequest request : requests) {
+  if (isShutdown) {
+throw shutdownExceptionFor(batch);
+  }
+
+  for (QueuedRequest request : batch.requestsReadOnly()) {
 // Map#put returns null if there was no previous mapping for the key, 
meaning we have not
 // seen it before.
-verify(pending.put(request.id(), request.getResponseStream()) == null);
+verify(
+pending.put(request.id(), request.getResponseStream()) == null,
+"Request already sent.");
   }
-  try {
-send(batchedRequest);
-  } catch (IllegalStateException e) {
+
+  if (!trySend(batch.asGetDataRequest())) {
 // The stream broke before this call went through; onNewStream will 
retry the fetch.
-LOG.warn("GetData stream broke before call started.", e);
+LOG.warn("GetData stream broke before call started.");
   }
 }
   }
 
-  @SuppressWarnings("argument")
-  private StreamingGetDataRequest flushToBatch(List requests) {
-// Put all global data requests first because there is only a single 
repeated field for
-// request ids and the initial ids correspond to global data requests if 
they are present.
-requests.sort(QueuedRequest.globalRequestsFirst());
-StreamingGetDataRequest.Builder builder = 
StreamingGetDataRequest.newBuilder();
-for (QueuedRequest request : requests) {
-  request.addToStreamingGetDataRequest(builder);
-}
-return builder.build();
+  private synchronized void verify(boolean condition, String message) {

Review Comment:
   done



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -144,41 +149,57 @@ protected boolean hasPendingRequests() {
   }
 
   @Override
-  public void sendHealthCheck() {
+  public void sendHealthCheck() throws WindmillStreamShutdownException {
 if (has

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839919732


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,129 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();
+// If getPhase() returns a value less than 0, the phaser has been 
terminated.
+if (awaitPhase < 0) {
+  return;
+}
+  }
+
   // We only check isReady periodically to effectively allow for 
increasing the outbound
   // buffer periodically. This reduces the overhead of blocking while 
still restricting
   // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
   if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
 outboundObserver.onNext(value);
 return;
   }
-  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-  // careful to observe the phase before observing isReady.
-  if (awaitPhase < 0) {
-awaitPhase = phaser.getPhase();
-  }
+
   if (outboundObserver.isReady()) {
 messagesSinceReady = 0;
 outboundObserver.onNext(value);
 return;
   }
 }
+
 // A callback has been registered to advance the phaser whenever the 
observer
 // transitions to  is ready. Since we are waiting for a phase observed 
before the
 // outboundObserver.isReady() returned false, we expect it to advance 
after the
 // channel has become ready.  This doesn't always seem to be the case 
(despite
 // documentation stating otherwise) so we poll periodically and 
enforce an overall
 // timeout related to the stream deadline.
-phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+int nextPhase =
+isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+// If nextPhase is a value less than 0, the phaser has been terminated.
+if (nextPhase < 0) {

Review Comment:
   added checks under `lock` to `onError` and `onComplete` all `onNext` calls 
are already guarded by `lock`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839901332


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GrpcGetDataStreamTest {
+  private static final String FAKE_SERVER_NAME = "Fake server for 
GrpcGetDataStreamTest";
+  private static final Windmill.JobHeader TEST_JOB_HEADER =
+  Windmill.JobHeader.newBuilder()
+  .setJobId("test_job")
+  .setWorkerId("test_worker")
+  .setProjectId("test_project")
+  .build();
+
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  private ManagedChannel inProcessChannel;
+
+  @Before
+  public void setUp() throws IOException {
+Server server =
+InProcessServerBuilder.forName(FAKE_SERVER_NAME)
+.fallbackHandlerRegistry(serviceRegistry)
+.directExecutor()
+.build()
+.start();
+
+inProcessChannel =
+grpcCleanup.register(
+
InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build());
+grpcCleanup.register(server);
+grpcCleanup.register(inProcessChannel);
+  }
+
+  @After
+  public void cleanUp() {
+inProcessChannel.shutdownNow();
+  }
+
+  private GrpcGetDataStream createGetDataStream(GetDataStreamTestStub 
testStub) {
+serviceRegistry.addService(testStub);
+GrpcGetDataStream getDataStream =
+(GrpcGetDataStream)
+GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+.setSendKeyedGetDataRequests(false)
+.build()
+.createGetDataStream(
+CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel),
+new ThrottleTimer());
+getDataStream.start();
+return getDataStream;
+  }
+
+  @Test
+  public void 
testRequestKeyedData_sendOnShutdownStreamThrowsWindmillStreamShutdownException()
 {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond t

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on PR #32774:
URL: https://github.com/apache/beam/pull/32774#issuecomment-2473107804

   still need to add some more tests to `DirectStreamObserverTest` 
   addressed the other comments
   
   thanks!
   @scwhittle 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839902664


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##
@@ -234,8 +229,13 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   @Override
-  public void sendHealthCheck() {
-send(HEALTH_CHECK_REQUEST);
+  public void sendHealthCheck() throws WindmillStreamShutdownException {
+trySend(HEALTH_CHECK_REQUEST);
+  }
+
+  @Override
+  protected void shutdownInternal() {
+workItemAssemblers.clear();

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839907045


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -271,12 +297,26 @@ public void 
onHeartbeatResponse(List resp
   }
 
   @Override
-  public void sendHealthCheck() {
+  public void sendHealthCheck() throws WindmillStreamShutdownException {
 if (hasPendingRequests()) {
-  send(StreamingGetDataRequest.newBuilder().build());
+  trySend(HEALTH_CHECK_REQUEST);
 }
   }
 
+  @Override
+  protected void shutdownInternal() {
+// Stream has been explicitly closed. Drain pending input streams and 
request batches.
+// Future calls to send RPCs will fail.
+pending.values().forEach(AppendableInputStream::cancel);
+pending.clear();
+batches.forEach(
+batch -> {
+  batch.markFinalized();

Review Comment:
   done



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -301,39 +341,58 @@ public void appendSpecificHtml(PrintWriter writer) {
 writer.append("]");
   }
 
-  private  ResponseT issueRequest(QueuedRequest request, 
ParseFn parseFn) {
-while (true) {
+  private  ResponseT issueRequest(QueuedRequest request, 
ParseFn parseFn)
+  throws WindmillStreamShutdownException {
+while (!isShutdownLocked()) {
   request.resetResponseStream();
   try {
 queueRequestAndWait(request);
 return parseFn.parse(request.getResponseStream());
-  } catch (CancellationException e) {
-// Retry issuing the request since the response stream was cancelled.
-continue;
+  } catch (AppendableInputStream.InvalidInputStreamStateException | 
CancellationException e) {
+handleShutdown(request, e);
+if (!(e instanceof CancellationException)) {
+  throw e;
+}
   } catch (IOException e) {
 LOG.error("Parsing GetData response failed: ", e);
-continue;
   } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
+handleShutdown(request, e);
 throw new RuntimeException(e);
   } finally {
 pending.remove(request.id());
   }
 }
+
+throw shutdownExceptionFor(request);
   }
 
-  private void queueRequestAndWait(QueuedRequest request) throws 
InterruptedException {
+  private synchronized void handleShutdown(QueuedRequest request, Throwable 
cause)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839906288


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -187,21 +208,26 @@ private long uniqueId() {
   }
 
   @Override
-  public KeyedGetDataResponse requestKeyedData(String computation, 
KeyedGetDataRequest request) {
+  public KeyedGetDataResponse requestKeyedData(String computation, 
KeyedGetDataRequest request)
+  throws WindmillStreamShutdownException {
 return issueRequest(
 QueuedRequest.forComputation(uniqueId(), computation, request),
 KeyedGetDataResponse::parseFrom);
   }
 
   @Override
-  public GlobalData requestGlobalData(GlobalDataRequest request) {
+  public GlobalData requestGlobalData(GlobalDataRequest request)
+  throws WindmillStreamShutdownException {
 return issueRequest(QueuedRequest.global(uniqueId(), request), 
GlobalData::parseFrom);
   }
 
   @Override
-  public void refreshActiveWork(Map> 
heartbeats) {
-if (isShutdown()) {
-  throw new WindmillStreamShutdownException("Unable to refresh work for 
shutdown stream.");
+  public void refreshActiveWork(Map> 
heartbeats)
+  throws WindmillStreamShutdownException {
+synchronized (this) {
+  if (isShutdown) {

Review Comment:
   done



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -301,39 +341,58 @@ public void appendSpecificHtml(PrintWriter writer) {
 writer.append("]");
   }
 
-  private  ResponseT issueRequest(QueuedRequest request, 
ParseFn parseFn) {
-while (true) {
+  private  ResponseT issueRequest(QueuedRequest request, 
ParseFn parseFn)
+  throws WindmillStreamShutdownException {
+while (!isShutdownLocked()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839903881


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -121,32 +131,44 @@ public static GrpcGetDataStream create(
   int streamingRpcBatchLimit,
   boolean sendKeyedGetDataRequests,
   Consumer> 
processHeartbeatResponses) {
-GrpcGetDataStream getDataStream =
-new GrpcGetDataStream(
-backendWorkerToken,
-startGetDataRpcFn,
-backoff,
-streamObserverFactory,
-streamRegistry,
-logEveryNStreamFailures,
-getDataThrottleTimer,
-jobHeader,
-idGenerator,
-streamingRpcBatchLimit,
-sendKeyedGetDataRequests,
-processHeartbeatResponses);
-getDataStream.startStream();
-return getDataStream;
+return new GrpcGetDataStream(
+backendWorkerToken,
+startGetDataRpcFn,
+backoff,
+streamObserverFactory,
+streamRegistry,
+logEveryNStreamFailures,
+getDataThrottleTimer,
+jobHeader,
+idGenerator,
+streamingRpcBatchLimit,
+sendKeyedGetDataRequests,
+processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException 
shutdownExceptionFor(QueuedBatch batch) {
+return new WindmillStreamShutdownException(
+"Stream was closed when attempting to send " + batch.requestsCount() + 
" requests.");
+  }
+
+  private static WindmillStreamShutdownException 
shutdownExceptionFor(QueuedRequest request) {
+return new WindmillStreamShutdownException(
+"Cannot send request=[" + request + "] on closed stream.");
+  }
+
+  private void sendIgnoringClosed(StreamingGetDataRequest getDataRequest)
+  throws WindmillStreamShutdownException {
+trySend(getDataRequest);
   }
 
   @Override
-  protected synchronized void onNewStream() {
-send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
-if (clientClosed.get()) {
+  protected synchronized void onNewStream() throws 
WindmillStreamShutdownException {
+trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
+if (clientClosed) {
   // We rely on close only occurring after all methods on the stream have 
returned.
   // Since the requestKeyedData and requestGlobalData methods are blocking 
this
   // means there should be no pending requests.
-  verify(!hasPendingRequests());
+  verify(!hasPendingRequests(), "Pending requests not expected on stream 
restart.");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839882212


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##
@@ -460,8 +470,9 @@ private void flushResponse() {
   "Sending batched response of {} ids", 
responseBuilder.getRequestIdCount());
   try {
 responseObserver.onNext(responseBuilder.build());
-  } catch (IllegalStateException e) {
+  } catch (Exception e) {
 // Stream is already closed.
+System.out.println("trieu: " + e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839839690


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+
+@RunWith(JUnit4.class)
+public class GrpcCommitWorkStreamTest {
+  private static final String FAKE_SERVER_NAME = "Fake server for 
GrpcCommitWorkStreamTest";
+  private static final Windmill.JobHeader TEST_JOB_HEADER =
+  Windmill.JobHeader.newBuilder()
+  .setJobId("test_job")
+  .setWorkerId("test_worker")
+  .setProjectId("test_project")
+  .build();
+  private static final String COMPUTATION_ID = "computationId";
+
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  private ManagedChannel inProcessChannel;
+
+  private static Windmill.WorkItemCommitRequest workItemCommitRequest(long 
value) {
+return Windmill.WorkItemCommitRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setShardingKey(value)
+.setWorkToken(value)
+.setCacheToken(value)
+.build();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+Server server =
+InProcessServerBuilder.forName(FAKE_SERVER_NAME)
+.fallbackHandlerRegistry(serviceRegistry)
+.directExecutor()
+.build()
+.start();
+
+inProcessChannel =
+grpcCleanup.register(
+
InProcessChannelBuilder.forName(FAKE_SERVER_NAME).directExecutor().build());
+grpcCleanup.register(server);
+grpcCleanup.register(inProcessChannel);
+  }
+
+  @After
+  public void cleanUp() {
+inProcessChannel.shutdownNow();
+  }
+
+  private GrpcCommitWorkStream createCommitWorkStream(CommitWorkStreamTestStub 
testStub) {
+serviceRegistry.addService(testStub);
+GrpcCommitWorkStream commitWorkStream =
+(GrpcCommitWorkStream)
+GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+.build()
+.createCommitWorkStream(
+CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel),
+new 

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-13 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839868939


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GrpcGetDataStreamRequestsTest {
+
+  @Test
+  public void testQueuedRequest_globalRequestsFirstComparator() {
+List requests = new ArrayList<>();
+Windmill.KeyedGetDataRequest keyedGetDataRequest1 =
+Windmill.KeyedGetDataRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setCacheToken(1L)
+.setShardingKey(1L)
+.setWorkToken(1L)
+.setMaxBytes(Long.MAX_VALUE)
+.build();
+requests.add(
+GrpcGetDataStreamRequests.QueuedRequest.forComputation(
+1, "computation1", keyedGetDataRequest1));
+
+Windmill.KeyedGetDataRequest keyedGetDataRequest2 =
+Windmill.KeyedGetDataRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setCacheToken(2L)
+.setShardingKey(2L)
+.setWorkToken(2L)
+.setMaxBytes(Long.MAX_VALUE)
+.build();
+requests.add(
+GrpcGetDataStreamRequests.QueuedRequest.forComputation(
+2, "computation2", keyedGetDataRequest2));
+
+Windmill.GlobalDataRequest globalDataRequest =
+Windmill.GlobalDataRequest.newBuilder()
+.setDataId(
+Windmill.GlobalDataId.newBuilder()
+.setTag("globalData")
+.setVersion(ByteString.EMPTY)
+.build())
+.setComputationId("computation1")
+.build();
+requests.add(GrpcGetDataStreamRequests.QueuedRequest.global(3, 
globalDataRequest));
+
+
requests.sort(GrpcGetDataStreamRequests.QueuedRequest.globalRequestsFirst());
+
+// First one should be the global request.
+assertTrue(requests.get(0).getDataRequest().isGlobal());
+  }
+
+  @Test
+  public void testQueuedBatch_asGetDataRequest() {
+GrpcGetDataStreamRequests.QueuedBatch queuedBatch = new 
GrpcGetDataStreamRequests.QueuedBatch();
+
+Windmill.KeyedGetDataRequest keyedGetDataRequest1 =
+Windmill.KeyedGetDataRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setCacheToken(1L)
+.setShardingKey(1L)
+.setWorkToken(1L)
+.setMaxBytes(Long.MAX_VALUE)
+.build();
+queuedBatch.addRequest(
+GrpcGetDataStreamRequests.QueuedRequest.forComputation(
+1, "computation1", keyedGetDataRequest1));
+
+Windmill.KeyedGetDataRequest keyedGetDataRequest2 =
+Windmill.KeyedGetDataRequest.newBuilder()
+.setKey(ByteString.EMPTY)
+.setCacheToken(2L)
+.setShardingKey(2L)
+.setWorkToken(2L)
+.setMaxBytes(Long.MAX_VALUE)
+.build();
+queuedBatch.addRequest(
+GrpcGetDataStreamRequests.QueuedRequest.forComputation(
+2, "computation2", keyedGetDataRequest2));
+
+Windmill.GlobalDataRequest globalDataRequest =
+Windmill.GlobalDataRequest.newBuilder()
+.setDataId(
+Windmill.GlobalDataId.newBuilder()
+.setTag("globalData")
+.setVersion(ByteString.EMPTY)
+.build())
+.setComputationId("computation1")
+.build

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1839380434


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.io.PrintWriter;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class AbstractWindmillStreamTest {
+  private static final long DEADLINE_SECONDS = 10;
+  private final Set> streamRegistry = 
ConcurrentHashMap.newKeySet();
+  private final StreamObserverFactory streamObserverFactory =
+  StreamObserverFactory.direct(DEADLINE_SECONDS, 1);
+
+  @Before
+  public void setUp() {
+streamRegistry.clear();
+  }
+
+  private TestStream newStream(
+  Function, StreamObserver> 
clientFactory) {
+return new TestStream(clientFactory, streamRegistry, 
streamObserverFactory);
+  }
+
+  @Test
+  public void testShutdown_notBlockedBySend() throws InterruptedException, 
ExecutionException {
+CountDownLatch sendBlocker = new CountDownLatch(1);
+Function, StreamObserver> clientFactory =
+ignored ->
+new CallStreamObserver() {
+  @Override
+  public void onNext(Integer integer) {
+try {
+  sendBlocker.await();
+} catch (InterruptedException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public void onError(Throwable throwable) {}
+
+  @Override
+  public void onCompleted() {}
+
+  @Override
+  public boolean isReady() {
+return false;
+  }
+
+  @Override
+  public void setOnReadyHandler(Runnable runnable) {}
+
+  @Override
+  public void disableAutoInboundFlowControl() {}
+
+  @Override
+  public void request(int i) {}
+
+  @Override
+  public void setMessageCompression(boolean b) {}
+};
+
+TestStream testStream = newStream(clientFactory);
+testStream.start();
+ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
+Future sendFuture =
+sendExecutor.submit(
+() ->
+assertThrows(WindmillStreamShutdownException.class, () -> 
testStream.testSend(1)));
+testStream.shutdown();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838568276


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,128 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838725854


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,128 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();
+// If getPhase() returns a value less than 0, the phaser has been 
terminated.
+if (awaitPhase < 0) {
+  return;
+}
+  }
+
   // We only check isReady periodically to effectively allow for 
increasing the outbound
   // buffer periodically. This reduces the overhead of blocking while 
still restricting
   // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
   if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
 outboundObserver.onNext(value);
 return;
   }
-  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-  // careful to observe the phase before observing isReady.
-  if (awaitPhase < 0) {
-awaitPhase = phaser.getPhase();
-  }
+
   if (outboundObserver.isReady()) {
 messagesSinceReady = 0;
 outboundObserver.onNext(value);
 return;
   }
 }
+
 // A callback has been registered to advance the phaser whenever the 
observer
 // transitions to  is ready. Since we are waiting for a phase observed 
before the
 // outboundObserver.isReady() returned false, we expect it to advance 
after the
 // channel has become ready.  This doesn't always seem to be the case 
(despite
 // documentation stating otherwise) so we poll periodically and 
enforce an overall
 // timeout related to the stream deadline.
-phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+int nextPhase =
+isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+// If nextPhase is a value less than 0, the phaser has been terminated.
+if (nextPhase < 0) {
+  throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+}
+
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
   messagesSinceReady = 0;
   outboundObserver.onNext(value);
   return;
 }
   } catch (TimeoutException e) {
 totalSecondsWaited += waitSeconds;
 if (totalSecondsWaited > deadlineSeconds) {
-  LOG.error(
-  "Exceeded timeout waiting for the outboundObserver to become 
ready meaning "
-  + "that the stream deadline was not respected.");
-  throw new RuntimeException(e);
+  String errorMessage = 
constructStreamCancelledErrorMessage(totalSecondsWaited);
+  LOG.error(errorMessage);
+  throw new StreamObserverCancelledException(errorMessage, e);
 }
-if (totalSecondsWaited > 30) {
+
+if (totalSecondsWaited > OUTPUT_CHANNEL_CONSIDERED_STALLED_SECONDS) {
   LOG.info(
   "Output channel stalled for {}s, outbound thread {}.",
   totalSecondsWaited,
   Thread.currentThread().getName());
 }
+
 waitSeconds = waitSeconds * 2;
   } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
-throw new RuntimeException(e);
+throw new StreamObserverCancelledException(e);
   }
 }
   }
 
   @Override
   public void onError(Throwable t) {
+isReadyNoti

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838725029


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
 commitWorkThrottleTimer.stop();
 
-RuntimeException finalException = null;
+CommitCompletionException failures = new CommitCompletionException();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838707399


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -139,4 +166,29 @@ public void onCompleted() {
   outboundObserver.onCompleted();
 }
   }
+
+  @Override
+  public void terminate(Throwable terminationException) {
+// Free the blocked threads in onNext().
+isReadyNotifier.forceTermination();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838705361


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838574677


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,128 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();
+// If getPhase() returns a value less than 0, the phaser has been 
terminated.
+if (awaitPhase < 0) {
+  return;
+}
+  }
+
   // We only check isReady periodically to effectively allow for 
increasing the outbound
   // buffer periodically. This reduces the overhead of blocking while 
still restricting
   // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
   if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
 outboundObserver.onNext(value);
 return;
   }
-  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-  // careful to observe the phase before observing isReady.
-  if (awaitPhase < 0) {
-awaitPhase = phaser.getPhase();
-  }
+
   if (outboundObserver.isReady()) {
 messagesSinceReady = 0;
 outboundObserver.onNext(value);
 return;
   }
 }
+
 // A callback has been registered to advance the phaser whenever the 
observer
 // transitions to  is ready. Since we are waiting for a phase observed 
before the
 // outboundObserver.isReady() returned false, we expect it to advance 
after the
 // channel has become ready.  This doesn't always seem to be the case 
(despite
 // documentation stating otherwise) so we poll periodically and 
enforce an overall
 // timeout related to the stream deadline.
-phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+int nextPhase =
+isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+// If nextPhase is a value less than 0, the phaser has been terminated.
+if (nextPhase < 0) {
+  throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+}
+
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
   messagesSinceReady = 0;
   outboundObserver.onNext(value);
   return;
 }
   } catch (TimeoutException e) {
 totalSecondsWaited += waitSeconds;
 if (totalSecondsWaited > deadlineSeconds) {
-  LOG.error(
-  "Exceeded timeout waiting for the outboundObserver to become 
ready meaning "
-  + "that the stream deadline was not respected.");
-  throw new RuntimeException(e);
+  String errorMessage = 
constructStreamCancelledErrorMessage(totalSecondsWaited);
+  LOG.error(errorMessage);
+  throw new StreamObserverCancelledException(errorMessage, e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838585634


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/TerminatingStreamObserver.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers;
+
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+
+@Internal
+public interface TerminatingStreamObserver extends StreamObserver {
+
+  /** Terminates the StreamObserver. */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838570350


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,128 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();
+// If getPhase() returns a value less than 0, the phaser has been 
terminated.
+if (awaitPhase < 0) {
+  return;
+}
+  }
+
   // We only check isReady periodically to effectively allow for 
increasing the outbound
   // buffer periodically. This reduces the overhead of blocking while 
still restricting
   // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
   if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
 outboundObserver.onNext(value);
 return;
   }
-  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-  // careful to observe the phase before observing isReady.
-  if (awaitPhase < 0) {
-awaitPhase = phaser.getPhase();
-  }
+
   if (outboundObserver.isReady()) {
 messagesSinceReady = 0;
 outboundObserver.onNext(value);
 return;
   }
 }
+
 // A callback has been registered to advance the phaser whenever the 
observer
 // transitions to  is ready. Since we are waiting for a phase observed 
before the
 // outboundObserver.isReady() returned false, we expect it to advance 
after the
 // channel has become ready.  This doesn't always seem to be the case 
(despite
 // documentation stating otherwise) so we poll periodically and 
enforce an overall
 // timeout related to the stream deadline.
-phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+int nextPhase =
+isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+// If nextPhase is a value less than 0, the phaser has been terminated.
+if (nextPhase < 0) {
+  throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+}
+
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838561714


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -146,6 +146,7 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   "Output channel stalled for {}s, outbound thread {}.",
   totalSecondsWaited,
   Thread.currentThread().getName());
+  Thread.dumpStack();

Review Comment:
   removed this waas for debugging



##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##
@@ -472,7 +473,8 @@ private void flushResponse() {
 responseObserver.onNext(responseBuilder.build());
   } catch (Exception e) {
 // Stream is already closed.
-System.out.println("trieu: " + e);
+LOG.warn("trieu: ", e);

Review Comment:
   removed this waas for debugging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838569451


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,128 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838563507


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -317,10 +320,13 @@ public String backendWorkerToken() {
 return backendWorkerToken;
   }
 
+  @SuppressWarnings("GuardedBy")
   @Override
   public final void shutdown() {
-// Don't lock on "this" before poisoning the request observer as allow IO 
to block shutdown.
+// Don't lock on "this" before poisoning the request observer since 
otherwise the observer may
+// be blocking in send().
 requestObserver.poison();
+isShutdown = true;

Review Comment:
   cleaneed up was supposed to stay within the sync block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838119412


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -146,6 +146,7 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   "Output channel stalled for {}s, outbound thread {}.",
   totalSecondsWaited,
   Thread.currentThread().getName());
+  Thread.dumpStack();

Review Comment:
   if you want to submit, log this instead. We've used things liek 
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10); 
elsehwere to get a string to log



##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##
@@ -472,7 +473,8 @@ private void flushResponse() {
 responseObserver.onNext(responseBuilder.build());
   } catch (Exception e) {
 // Stream is already closed.
-System.out.println("trieu: " + e);
+LOG.warn("trieu: ", e);

Review Comment:
   rm debug logs



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -144,41 +149,57 @@ protected boolean hasPendingRequests() {
   }
 
   @Override
-  public void sendHealthCheck() {
+  public void sendHealthCheck() throws WindmillStreamShutdownException {
 if (hasPendingRequests()) {
   StreamingCommitWorkRequest.Builder builder = 
StreamingCommitWorkRequest.newBuilder();
   builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
-  send(builder.build());
+  trySend(builder.build());
 }
   }
 
   @Override
   protected void onResponse(StreamingCommitResponse response) {
 commitWorkThrottleTimer.stop();
 
-RuntimeException finalException = null;
+CommitCompletionException failures = new CommitCompletionException();
 for (int i = 0; i < response.getRequestIdCount(); ++i) {

Review Comment:
   ping for above comment since might be lost in github comment threads. I was 
thinking the builder would not be an exception itself, and then the exception 
would just be a simple class without mutating methods.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,76 +72,129 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();
+// If getPhase() returns a value less than 0, the phaser has been 
terminated.
+if (awaitPhase < 0) {
+  return;
+}
+  }
+
   // We only check isReady periodically to effectively allow for 
increasing the outbound
   // buffer periodically. This reduces the overhead of blocking while 
still restricting
   // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
   if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
 outboundObserver.onNext(value);
 return;
   }
-  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-  // careful to observe the phase before observing isReady.
-  if (awaitPhase < 0) {
-awaitPhase = phaser.getPhase();
-  }
+
   if (outboundObserver.isReady()) {
 messagesSinceReady = 0;
 outboundObserver.onNext(value);
 return;
   }
 }
+
 // A callback has been registered to advance the phaser whenever the 
observer
 // transitions to  is ready. Since we are waiting for a phase observed 
before the
 // outboundObserver.isReady() returned false, we expect it to advance 
after the
 // channel has become ready.  This doesn't al

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837622491


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -152,114 +149,157 @@ private static long debugDuration(long nowMs, long 
startMs) {
*/
   protected abstract void startThrottleTimer();
 
-  /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  /** Try to send a request to the server. Returns true if the request was 
successfully sent. */
+  @CanIgnoreReturnValue
+  protected final synchronized boolean trySend(RequestT request)
+  throws WindmillStreamShutdownException {
+debugMetrics.recordSend();
+try {
+  requestObserver.onNext(request);
+  return true;
+} catch (StreamClosedException e) {
+  // Stream was broken, requests may be retried when stream is reopened.
 }
 
-return requestObserver;
+return false;
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
 synchronized (this) {
-  if (streamClosed.get()) {
-throw new IllegalStateException("Send called on a client closed 
stream.");
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
   }
+}
 
-  requestObserver().onNext(request);
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  debugMetrics.recordStart();
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+// shutdown() is responsible for cleaning up pending requests.
+logger.debug("Stream was shutdown while creating new stream.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",
+  getClass());
+  // Shutdown the stream to clean up any dangling resources and 
pending requests.
+  shutdown();
+  break;
+} catch (IOException ioe) {
   // Keep trying to create the stream.
 }
   }
 }
+
+// We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+// removed when closed.
+streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+try {
+  executor.execute(runnable);
+} catch (RejectedExecutionException e) {
+  logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+}
   }
 
   public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+if (!clientClosed && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
   try {
 sendHealthCheck();
-  } catch (RuntimeException e) {
-LOG.debug("Received exception sending health check.", e);
+  } catch (Exception e) {
+logger.debug("Received exception sending health check.", e);
   }
 }
   }
 
-  p

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-12 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837621434


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -305,77 +341,96 @@ public void onNext(ResponseT response) {
   } catch (IOException e) {
 // Ignore.
   }
-  lastResponseTimeMs.set(Instant.now().getMillis());
+  debugMetrics.recordResponse();
   onResponse(response);
 }
 
 @Override
 public void onError(Throwable t) {
-  onStreamFinished(t);
+  if (maybeTeardownStream()) {
+return;
+  }
+
+  recordStreamStatus(Status.fromThrowable(t));
+
+  try {
+long sleep = backoff.nextBackOffMillis();
+debugMetrics.recordSleep(sleep);
+sleeper.sleep(sleep);
+  } catch (InterruptedException e) {
+Thread.currentThread().interrupt();
+return;
+  } catch (IOException e) {
+// Ignore.
+  }
+
+  executeSafely(AbstractWindmillStream.this::startStream);
 }
 
 @Override
 public void onCompleted() {
-  onStreamFinished(null);
+  if (maybeTeardownStream()) {
+return;
+  }
+  recordStreamStatus(OK_STATUS);
+  executeSafely(AbstractWindmillStream.this::startStream);
 }
 
-private void onStreamFinished(@Nullable Throwable t) {
-  synchronized (this) {
-if (isShutdown.get() || (clientClosed.get() && !hasPendingRequests())) 
{
-  streamRegistry.remove(AbstractWindmillStream.this);
-  finishLatch.countDown();
-  return;
-}
-  }
-  if (t != null) {
-Status status = null;
-if (t instanceof StatusRuntimeException) {
-  status = ((StatusRuntimeException) t).getStatus();
-}
-String statusError = status == null ? "" : status.toString();
-setLastError(statusError);
-if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+private void recordStreamStatus(Status status) {
+  int currentRestartCount = debugMetrics.incrementAndGetRestarts();
+  if (status.isOk()) {
+String restartReason =
+"Stream completed successfully but did not complete requested 
operations, "
++ "recreating";
+logger.warn(restartReason);
+debugMetrics.recordRestartReason(restartReason);
+  } else {
+int currentErrorCount = debugMetrics.incrementAndGetErrors();
+debugMetrics.recordRestartReason(status.toString());
+Throwable t = status.getCause();
+if (t instanceof StreamObserverCancelledException) {
+  logger.error(
+  "StreamObserver was unexpectedly cancelled for stream={}, 
worker={}. stacktrace={}",
+  getClass(),
+  backendWorkerToken,
+  t.getStackTrace(),
+  t);
+} else if (currentRestartCount % logEveryNStreamFailures == 0) {
+  // Don't log every restart since it will get noisy, and many errors 
transient.
   long nowMillis = Instant.now().getMillis();
-  String responseDebug;
-  if (lastResponseTimeMs.get() == 0) {
-responseDebug = "never received response";
-  } else {
-responseDebug =
-"received response " + (nowMillis - lastResponseTimeMs.get()) 
+ "ms ago";
-  }
-  LOG.debug(
-  "{} streaming Windmill RPC errors for {}, last was: {} with 
status {}."
-  + " created {}ms ago, {}. This is normal with autoscaling.",
+  logger.debug(
+  "{} has been restarted {} times. Streaming Windmill RPC Error 
Count: {}; last was: {}"
+  + " with status: {}. created {}ms ago; {}. This is normal 
with autoscaling.",
   AbstractWindmillStream.this.getClass(),
-  errorCount.get(),
+  currentRestartCount,
+  currentErrorCount,
   t,
-  statusError,
-  nowMillis - startTimeMs.get(),
-  responseDebug);
+  status,
+  nowMillis - debugMetrics.getStartTimeMs(),
+  debugMetrics
+  .responseDebugString(nowMillis)
+  .orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
 }
+
 // If the stream was stopped due to a resource exhausted error then we 
are throttled.
-if (status != null && status.getCode() == 
Status.Code.RESOURCE_EXHAUSTED) {
+if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-11 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837624075


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -278,23 +318,19 @@ public String backendWorkerToken() {
   }
 
   @Override
-  public void shutdown() {
-if (isShutdown.compareAndSet(false, true)) {
-  requestObserver()
-  .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+  public final void shutdown() {
+// Don't lock on "this" before poisoning the request observer as allow IO 
to block shutdown.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-11 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837626505


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ * {@link ThreadSafe}. Has same methods declared in {@link 
StreamObserver}, but they throw
+ * {@link StreamClosedException} and {@link 
WindmillStreamShutdownException}, which much be
+ * handled by callers.
+ */
+@ThreadSafe
+@Internal
+final class ResettableThrowingStreamObserver {
+  private final Supplier> streamObserverFactory;
+  private final Logger logger;
+
+  @GuardedBy("this")
+  private @Nullable TerminatingStreamObserver delegateStreamObserver;
+
+  @GuardedBy("this")
+  private boolean isPoisoned = false;
+
+  /**
+   * Indicates that the current delegate is closed via {@link #poison() or 
{@link #onCompleted()}}.
+   * If not poisoned, a call to {@link #reset()} is required to perform future 
operations on the
+   * StreamObserver.
+   */
+  @GuardedBy("this")
+  private boolean isCurrentStreamClosed = false;
+
+  ResettableThrowingStreamObserver(
+  Supplier> streamObserverFactory, Logger 
logger) {
+this.streamObserverFactory = streamObserverFactory;
+this.logger = logger;
+this.delegateStreamObserver = null;
+  }
+
+  private synchronized StreamObserver delegate()
+  throws WindmillStreamShutdownException, StreamClosedException {
+if (isPoisoned) {
+  throw new WindmillStreamShutdownException("Stream is already shutdown.");
+}
+
+if (isCurrentStreamClosed) {
+  throw new StreamClosedException(
+  "Current stream is closed, requires reset for future stream 
operations.");
+}
+
+return Preconditions.checkNotNull(
+delegateStreamObserver,
+"requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  }
+
+  /** Creates a new delegate to use for future {@link StreamObserver} methods. 
*/
+  synchronized void reset() throws WindmillStreamShutdownException {
+if (isPoisoned) {
+  throw new WindmillStreamShutdownException("Stream is already shutdown.");
+}
+
+delegateStreamObserver = streamObserverFactory.get();
+isCurrentStreamClosed = false;
+  }
+
+  /**
+   * Indicates that the request observer should no longer be used. Attempts to 
perform operations on
+   * the request observer will throw an {@link 
WindmillStreamShutdownException}.
+   */
+  synchronized void poison() {
+if (!isPoisoned) {
+  isPoisoned = true;
+  if (delegateStreamObserver != null) {
+delegateStreamObserver.terminate(
+new WindmillStreamShutdownException("Explicit call to shutdown 
stream."));
+delegateStreamObserver = null;
+isCurrentStreamClosed = true;
+  }
+}
+  }
+
+  public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownException {
+// Make sure onNext and onError below to be called on the same 
StreamObserver instance.
+StreamObserver delegate = delegate();
+try {
+  // Do NOT lock while sending message over the stream as this will block 
other StreamObserver
+  // operation

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-11 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837625329


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ * {@link ThreadSafe}. Has same methods declared in {@link 
StreamObserver}, but they throw
+ * {@link StreamClosedException} and {@link 
WindmillStreamShutdownException}, which much be
+ * handled by callers.
+ */
+@ThreadSafe
+@Internal
+final class ResettableThrowingStreamObserver {
+  private final Supplier> streamObserverFactory;
+  private final Logger logger;
+
+  @GuardedBy("this")
+  private @Nullable TerminatingStreamObserver delegateStreamObserver;
+
+  @GuardedBy("this")
+  private boolean isPoisoned = false;
+
+  /**
+   * Indicates that the current delegate is closed via {@link #poison() or 
{@link #onCompleted()}}.
+   * If not poisoned, a call to {@link #reset()} is required to perform future 
operations on the
+   * StreamObserver.
+   */
+  @GuardedBy("this")
+  private boolean isCurrentStreamClosed = false;
+
+  ResettableThrowingStreamObserver(
+  Supplier> streamObserverFactory, Logger 
logger) {
+this.streamObserverFactory = streamObserverFactory;
+this.logger = logger;
+this.delegateStreamObserver = null;
+  }
+
+  private synchronized StreamObserver delegate()
+  throws WindmillStreamShutdownException, StreamClosedException {
+if (isPoisoned) {
+  throw new WindmillStreamShutdownException("Stream is already shutdown.");
+}
+
+if (isCurrentStreamClosed) {
+  throw new StreamClosedException(
+  "Current stream is closed, requires reset for future stream 
operations.");
+}
+
+return Preconditions.checkNotNull(
+delegateStreamObserver,
+"requestObserver cannot be null. Missing a call to startStream() to 
initialize.");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-11 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837618004


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java:
##
@@ -59,6 +59,9 @@ final class StreamDebugMetrics {
   @GuardedBy("this")
   private DateTime shutdownTime = null;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-11 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837608239


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) {
 }
 
 /** Returns true if the stream was torn down and should not be restarted 
internally. */
-private synchronized boolean maybeTeardownStream() {
-  if (hasReceivedShutdownSignal() || (clientClosed && 
!hasPendingRequests())) {
-streamRegistry.remove(AbstractWindmillStream.this);
-finishLatch.countDown();
-executor.shutdownNow();
-return true;
-  }
+private boolean maybeTeardownStream() {
+  synchronized (AbstractWindmillStream.this) {

Review Comment:
   fixed by moving it out of the inner class
   GuardedBy check was failing since reference to `this` was ambiguous



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) {
 }
 
 /** Returns true if the stream was torn down and should not be restarted 
internally. */
-private synchronized boolean maybeTeardownStream() {
-  if (hasReceivedShutdownSignal() || (clientClosed && 
!hasPendingRequests())) {
-streamRegistry.remove(AbstractWindmillStream.this);
-finishLatch.countDown();
-executor.shutdownNow();
-return true;
-  }
+private boolean maybeTeardownStream() {
+  synchronized (AbstractWindmillStream.this) {
+if (isShutdown || (clientClosed && !hasPendingRequests())) {
+  streamRegistry.remove(AbstractWindmillStream.this);

Review Comment:
   fixed by moving it out of the inner class
   GuardedBy check was failing since reference to `this` was ambiguous



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-11 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837603481


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -272,7 +280,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
 summaryMetrics.timeSinceLastSend(),
 summaryMetrics.timeSinceLastResponse(),
 requestObserver.isClosed(),
-hasReceivedShutdownSignal(),
+summaryMetrics.shutdownTime().isPresent(),

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-08 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1833955728


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) {
 }
 
 /** Returns true if the stream was torn down and should not be restarted 
internally. */
-private synchronized boolean maybeTeardownStream() {
-  if (hasReceivedShutdownSignal() || (clientClosed && 
!hasPendingRequests())) {
-streamRegistry.remove(AbstractWindmillStream.this);
-finishLatch.countDown();
-executor.shutdownNow();
-return true;
-  }
+private boolean maybeTeardownStream() {
+  synchronized (AbstractWindmillStream.this) {

Review Comment:
   just make a synchronized method?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java:
##
@@ -59,6 +59,9 @@ final class StreamDebugMetrics {
   @GuardedBy("this")
   private DateTime shutdownTime = null;

Review Comment:
   mark nullable and above
   
   I think you still need to fix the over exemption of windmill classes from 
checker. 
   https://github.com/apache/beam/issues/30183



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -272,7 +280,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
 summaryMetrics.timeSinceLastSend(),
 summaryMetrics.timeSinceLastResponse(),
 requestObserver.isClosed(),
-hasReceivedShutdownSignal(),
+summaryMetrics.shutdownTime().isPresent(),

Review Comment:
   nit: could remove this part since it is duplicated with null.  Could render 
null differently maybe somethign like
   
   summaryMetrics.shutdownTime().map(Instant::toString).orElse("not shutdown")
   
   



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) {
 }
 
 /** Returns true if the stream was torn down and should not be restarted 
internally. */
-private synchronized boolean maybeTeardownStream() {
-  if (hasReceivedShutdownSignal() || (clientClosed && 
!hasPendingRequests())) {
-streamRegistry.remove(AbstractWindmillStream.this);
-finishLatch.countDown();
-executor.shutdownNow();
-return true;
-  }
+private boolean maybeTeardownStream() {
+  synchronized (AbstractWindmillStream.this) {
+if (isShutdown || (clientClosed && !hasPendingRequests())) {
+  streamRegistry.remove(AbstractWindmillStream.this);

Review Comment:
   why doesn't just "this" work?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ * {@link ThreadSafe}. Has same methods declared in {@link 
StreamObserver

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831934329


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831921566


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -67,61 +69,86 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
 int awaitPhase = -1;
 long totalSecondsWaited = 0;
 long waitSeconds = 1;
 while (true) {
   try {
 synchronized (lock) {
+  int currentPhase = isReadyNotifier.getPhase();
+  // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+  // are synchronized after terminating the phaser if we observe that 
the phaser is not
+  // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+  if (currentPhase < 0) return;
+
+  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+  // careful to observe the phase before observing isReady.
+  if (awaitPhase < 0) {
+awaitPhase = isReadyNotifier.getPhase();
+// If getPhase() returns a value less than 0, the phaser has been 
terminated.
+if (awaitPhase < 0) {
+  return;
+}
+  }
+
   // We only check isReady periodically to effectively allow for 
increasing the outbound
   // buffer periodically. This reduces the overhead of blocking while 
still restricting
   // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
   if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
 outboundObserver.onNext(value);
 return;
   }
-  // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-  // careful to observe the phase before observing isReady.
-  if (awaitPhase < 0) {
-awaitPhase = phaser.getPhase();
-  }
+
   if (outboundObserver.isReady()) {
 messagesSinceReady = 0;
 outboundObserver.onNext(value);
 return;
   }
 }
+
 // A callback has been registered to advance the phaser whenever the 
observer
 // transitions to  is ready. Since we are waiting for a phase observed 
before the
 // outboundObserver.isReady() returned false, we expect it to advance 
after the
 // channel has become ready.  This doesn't always seem to be the case 
(despite
 // documentation stating otherwise) so we poll periodically and 
enforce an overall
 // timeout related to the stream deadline.
-phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+int nextPhase =
+isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+// If nextPhase is a value less than 0, the phaser has been terminated.
+if (nextPhase < 0) {
+  return;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831907579


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+/** Thrown when operations are requested on a {@link WindmillStream} has been 
shutdown/closed. */
+public final class WindmillStreamShutdownException extends Exception {

Review Comment:
   I think its also nice to have both since we ignore the StreamClosedException 
usually and We always propogate the WindmillStreamShutdownException
   
   might be useful to just have send return true/false? instead of throwing 
StreamClosed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831905512


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+/** Thrown when operations are requested on a {@link WindmillStream} has been 
shutdown/closed. */
+public final class WindmillStreamShutdownException extends Exception {

Review Comment:
   It's not really retryable, just retries internally to flush any pending 
messages so just updated the comment and moved both exceptions in 
`WindmillStreamExceptions.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831900037


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -139,4 +166,29 @@ public void onCompleted() {
   outboundObserver.onCompleted();
 }
   }
+
+  @Override
+  public void terminate(Throwable terminationException) {
+// Free the blocked threads in onNext().
+isReadyNotifier.forceTermination();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831884637


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+/** Thrown when operations are requested on a {@link WindmillStream} has been 
shutdown/closed. */
+public final class WindmillStreamShutdownException extends Exception {

Review Comment:
   Renamed StreamClosedException to RetryableStreamClosedException since we 
will try to flush all of the pending requests in this case.  For shutdown it 
means that we are no longer operating with the stream as it is invalid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831883547


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -139,4 +166,29 @@ public void onCompleted() {
   outboundObserver.onCompleted();

Review Comment:
   When I look at how this is used in StreamObserverFactory we call 
phaser.forceTermination in `ForwardingClientREsponseObserver#onDone` so we 
would always be calling onError here since the onDoneHandler runs and 
terminates the StreamObserver?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831879138


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##
@@ -139,4 +166,29 @@ public void onCompleted() {
   outboundObserver.onCompleted();
 }
   }
+
+  @Override
+  public void terminate(Throwable terminationException) {
+// Free the blocked threads in onNext().
+isReadyNotifier.forceTermination();
+try {
+  onError(terminationException);
+} catch (RuntimeException e) {
+  // If onError or onComplete was previously called, this will throw.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831875706


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+return isShutdown;
   }
 
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+  throws StreamClosedException, WindmillStreamShutdownException {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
 synchronized (this) {
-  if (streamClosed.get()) {
-throw new IllegalStateException("Send called on a client closed 
stream.");
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
   }
+}
 
-  requestObserver().onNext(request);
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  debugMetrics.recordStart();
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown while creating new stream.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",
+  getClass());
+  break;
+} catch (IOException ioe) {
   // Keep trying to create the stream.
 }
   }
 }
+
+// We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+// removed when closed.
+streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+try {
+  executor.execute(runnable);
+} catch (RejectedExecutionException e) {
+  logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+}
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+if (!clientClosed && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
   try {
 sendHealthCheck();
-  } catch (RuntimeException e) {
-LOG.debug("Received exception sending health check.", e);
+  } catch (Exception e) {
+logger.debug("Received exception sending health check.", e);
   }
 }
   }
 
-  protected abstract void sendHealthCheck();
+  protected abstract void sendHealthCheck()
+  throws WindmillStreamShutdownException, StreamClosedException;
 
-  // Care is taken that synchronization on this is unnecessary for all status 
page information.
-  // Blocking s

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831869718


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+return isShutdown;
   }
 
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+  throws StreamClosedException, WindmillStreamShutdownException {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
 synchronized (this) {
-  if (streamClosed.get()) {
-throw new IllegalStateException("Send called on a client closed 
stream.");
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
   }
+}
 
-  requestObserver().onNext(request);
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  debugMetrics.recordStart();
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown while creating new stream.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",
+  getClass());
+  break;
+} catch (IOException ioe) {
   // Keep trying to create the stream.
 }
   }
 }
+
+// We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+// removed when closed.
+streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+try {
+  executor.execute(runnable);
+} catch (RejectedExecutionException e) {
+  logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+}
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831869554


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -49,46 +44,52 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * send and startStream should not be called from onResponse; use 
executor() instead.
+ * {@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * {@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
-  private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  // Indicates that the logical stream has been half-closed and is waiting for 
clean server
+  // shutdown.
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+  private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never 
received response";
+  protected final Sleeper sleeper;
+
+  private final Logger logger;
+  private final ExecutorService executor;
   private final BackOff backoff;
-  private final AtomicLong startTimeMs;
-  private final AtomicLong lastResponseTimeMs;
-  private final AtomicInteger errorCount;
-  private final AtomicReference lastError;
-  private final AtomicReference lastErrorTime;
-  private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver requestObserver;
+  private final ResettableThrowingStreamObserver requestObserver;
+  private final StreamDebugMetrics debugMetrics;
+  protected volatile boolean clientClosed;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831723961


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -314,24 +417,35 @@ private Batcher() {
 @Override
 public boolean commitWorkItem(
 String computation, WorkItemCommitRequest commitRequest, 
Consumer onDone) {
-  if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+  if (!canAccept(commitRequest.getSerializedSize() + computation.length())
+  || hasReceivedShutdownSignal()) {
 return false;
   }
-  PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+  PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
   add(idGenerator.incrementAndGet(), request);
   return true;
 }
 
 /** Flushes any pending work items to the wire. */
 @Override
 public void flush() {
-  flushInternal(queue);
-  queuedBytes = 0;
-  queue.clear();
+  try {
+if (!hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831723273


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -314,24 +417,35 @@ private Batcher() {
 @Override
 public boolean commitWorkItem(
 String computation, WorkItemCommitRequest commitRequest, 
Consumer onDone) {
-  if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+  if (!canAccept(commitRequest.getSerializedSize() + computation.length())
+  || hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831722466


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
 commitWorkThrottleTimer.stop();
 
-RuntimeException finalException = null;
+CommitCompletionException failures = new CommitCompletionException();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831720486


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
 commitWorkThrottleTimer.stop();
 
-RuntimeException finalException = null;
+CommitCompletionException failures = new CommitCompletionException();
 for (int i = 0; i < response.getRequestIdCount(); ++i) {
   long requestId = response.getRequestId(i);
   if (requestId == HEARTBEAT_REQUEST_ID) {
 continue;
   }
-  PendingRequest done = pending.remove(requestId);
-  if (done == null) {
-LOG.error("Got unknown commit request ID: {}", requestId);
+  PendingRequest pendingRequest = pending.remove(requestId);
+  CommitStatus commitStatus =
+  i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
+  if (pendingRequest == null) {
+if (!hasReceivedShutdownSignal()) {
+  // Skip responses when the stream is shutdown since they are now 
invalid.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831708278


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+return isShutdown;
   }
 
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+  throws StreamClosedException, WindmillStreamShutdownException {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
 synchronized (this) {
-  if (streamClosed.get()) {
-throw new IllegalStateException("Send called on a client closed 
stream.");
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
   }
+}
 
-  requestObserver().onNext(request);
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  debugMetrics.recordStart();
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown while creating new stream.", e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831713003


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -121,32 +130,49 @@ public static GrpcGetDataStream create(
   int streamingRpcBatchLimit,
   boolean sendKeyedGetDataRequests,
   Consumer> 
processHeartbeatResponses) {
-GrpcGetDataStream getDataStream =
-new GrpcGetDataStream(
-backendWorkerToken,
-startGetDataRpcFn,
-backoff,
-streamObserverFactory,
-streamRegistry,
-logEveryNStreamFailures,
-getDataThrottleTimer,
-jobHeader,
-idGenerator,
-streamingRpcBatchLimit,
-sendKeyedGetDataRequests,
-processHeartbeatResponses);
-getDataStream.startStream();
-return getDataStream;
+return new GrpcGetDataStream(
+backendWorkerToken,
+startGetDataRpcFn,
+backoff,
+streamObserverFactory,
+streamRegistry,
+logEveryNStreamFailures,
+getDataThrottleTimer,
+jobHeader,
+idGenerator,
+streamingRpcBatchLimit,
+sendKeyedGetDataRequests,
+processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException 
shutdownExceptionFor(QueuedBatch batch) {
+return new WindmillStreamShutdownException(
+"Stream was closed when attempting to send " + batch.requestsCount() + 
" requests.");
+  }
+
+  private static WindmillStreamShutdownException 
shutdownExceptionFor(QueuedRequest request) {
+return new WindmillStreamShutdownException(
+"Cannot send request=[" + request + "] on closed stream.");
+  }
+
+  private void sendIgnoringClosed(StreamingGetDataRequest getDataRequest)
+  throws WindmillStreamShutdownException {
+try {
+  send(getDataRequest);
+} catch (StreamClosedException e) {
+  // Stream was closed on send, will be retried on stream restart.
+}
   }
 
   @Override
-  protected synchronized void onNewStream() {
+  protected synchronized void onNewStream()
+  throws StreamClosedException, WindmillStreamShutdownException {
 send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
-if (clientClosed.get()) {
+if (clientClosed && !hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831709979


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+return isShutdown;
   }
 
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+  throws StreamClosedException, WindmillStreamShutdownException {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
 synchronized (this) {
-  if (streamClosed.get()) {
-throw new IllegalStateException("Send called on a client closed 
stream.");
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
   }
+}
 
-  requestObserver().onNext(request);
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  debugMetrics.recordStart();
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown while creating new stream.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",

Review Comment:
   we shutdown the executor in maybeTeardownStream() so this would clean up any 
dangling tasks in the executor
   lets shutdown the stream
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1831707027


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -301,39 +343,59 @@ public void appendSpecificHtml(PrintWriter writer) {
 writer.append("]");
   }
 
-  private  ResponseT issueRequest(QueuedRequest request, 
ParseFn parseFn) {
-while (true) {
+  private  ResponseT issueRequest(QueuedRequest request, 
ParseFn parseFn)
+  throws WindmillStreamShutdownException {
+while (!hasReceivedShutdownSignal()) {
   request.resetResponseStream();
   try {
 queueRequestAndWait(request);
 return parseFn.parse(request.getResponseStream());
-  } catch (CancellationException e) {
-// Retry issuing the request since the response stream was cancelled.
-continue;
+  } catch (AppendableInputStream.InvalidInputStreamStateException | 
CancellationException e) {
+handleShutdown(request, e);
+if (!(e instanceof CancellationException)) {
+  throw e;
+}
   } catch (IOException e) {
 LOG.error("Parsing GetData response failed: ", e);
-continue;
   } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
+handleShutdown(request, e);
 throw new RuntimeException(e);
   } finally {
 pending.remove(request.id());
   }
 }
+
+throw new WindmillStreamShutdownException(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-06 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830671651


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+return isShutdown;
   }
 
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+  throws StreamClosedException, WindmillStreamShutdownException {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
 synchronized (this) {
-  if (streamClosed.get()) {
-throw new IllegalStateException("Send called on a client closed 
stream.");
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
   }
+}
 
-  requestObserver().onNext(request);
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  debugMetrics.recordStart();
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown while creating new stream.", e);

Review Comment:
   // shutdown() is responsible for cleaning up pending requests.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -49,46 +44,52 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * send and startStream should not be called from onResponse; use 
executor() instead.
+ * {@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * {@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdow

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830228776


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ * {@link ThreadSafe}.
+ */
+@ThreadSafe
+@Internal
+final class ResettableStreamObserver implements StreamObserver {
+  private final Supplier> streamObserverFactory;
+
+  @GuardedBy("this")
+  private @Nullable StreamObserver delegateStreamObserver;
+
+  @GuardedBy("this")
+  private boolean isPoisoned;
+
+  ResettableStreamObserver(Supplier> streamObserverFactory) {
+this.streamObserverFactory = streamObserverFactory;
+this.delegateStreamObserver = null;
+this.isPoisoned = false;
+  }
+
+  private synchronized StreamObserver delegate() {
+if (isPoisoned) {
+  throw new WindmillStreamShutdownException("Explicit call to shutdown 
stream.");
+}
+
+return Preconditions.checkNotNull(
+delegateStreamObserver,
+"requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  }
+
+  /** Creates a new delegate to use for future {@link StreamObserver} methods. 
*/
+  synchronized void reset() {
+if (isPoisoned) {
+  throw new WindmillStreamShutdownException("Explicit call to shutdown 
stream.");
+}
+
+delegateStreamObserver = streamObserverFactory.get();
+  }
+
+  /**
+   * Indicates that the request observer should no longer be used. Attempts to 
perform operations on
+   * the request observer will throw an {@link 
WindmillStreamShutdownException}.
+   */
+  synchronized void poison() {
+if (!isPoisoned) {
+  isPoisoned = true;
+  if (delegateStreamObserver != null) {
+delegateStreamObserver.onError(
+new WindmillStreamShutdownException("Explicit call to shutdown 
stream."));
+  }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830060117


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -342,62 +391,88 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
   batch.addRequest(request);
 }
 if (responsibleForSend) {
-  if (waitForSendLatch == null) {
+  if (prevBatch == null) {
 // If there was not a previous batch wait a little while to improve
 // batching.
-Thread.sleep(1);
+sleeper.sleep(1);
   } else {
-waitForSendLatch.await();
+prevBatch.waitForSendOrFailNotification();
   }
   // Finalize the batch so that no additional requests will be added.  
Leave the batch in the
   // queue so that a subsequent batch will wait for its completion.
-  synchronized (batches) {
-verify(batch == batches.peekFirst());
+  synchronized (shutdownLock) {
+if (hasReceivedShutdownSignal()) {
+  throw shutdownException(batch);
+}
+
+verify(batch == batches.peekFirst(), "GetDataStream request batch 
removed before send().");
 batch.markFinalized();
   }
-  sendBatch(batch.requests());
-  synchronized (batches) {
-verify(batch == batches.pollFirst());
+  trySendBatch(batch);
+} else {
+  // Wait for this batch to be sent before parsing the response.
+  batch.waitForSendOrFailNotification();
+}
+  }
+
+  void trySendBatch(QueuedBatch batch) {
+try {
+  sendBatch(batch);
+  synchronized (shutdownLock) {
+if (hasReceivedShutdownSignal()) {
+  throw shutdownException(batch);
+}
+
+verify(
+batch == batches.pollFirst(),
+"Sent GetDataStream request batch removed before send() was 
complete.");
   }
   // Notify all waiters with requests in this batch as well as the sender
   // of the next batch (if one exists).
-  batch.countDown();
-} else {
-  // Wait for this batch to be sent before parsing the response.
-  batch.await();
+  batch.notifySent();
+} catch (Exception e) {
+  // Free waiters if the send() failed.
+  batch.notifyFailed();
+  // Propagate the exception to the calling thread.
+  throw e;
 }
   }
 
-  @SuppressWarnings("NullableProblems")
-  private void sendBatch(List requests) {
-StreamingGetDataRequest batchedRequest = flushToBatch(requests);
+  private void sendBatch(QueuedBatch batch) {
+if (batch.isEmpty()) {
+  return;
+}
+
+// Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
+// sent on stream reconnect.
 synchronized (this) {
-  // Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
-  // sent on stream reconnect.
-  for (QueuedRequest request : requests) {
+  synchronized (shutdownLock) {
+// shutdown() clears pending, once the stream is shutdown, prevent 
values from being added

Review Comment:
   whoops that was supposed to be in done w/in the lock done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830055450


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -49,22 +49,29 @@
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@ThreadSafe
 final class GrpcGetDataStream
 extends AbstractWindmillStream
 implements GetDataStream {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcGetDataStream.class);
+  private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
+  StreamingGetDataRequest.newBuilder().build();
 
+  /** @implNote {@link QueuedBatch} objects in the queue are is guarded by 
{@link #shutdownLock} */
   private final Deque batches;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830054321


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -121,32 +129,43 @@ public static GrpcGetDataStream create(
   int streamingRpcBatchLimit,
   boolean sendKeyedGetDataRequests,
   Consumer> 
processHeartbeatResponses) {
-GrpcGetDataStream getDataStream =
-new GrpcGetDataStream(
-backendWorkerToken,
-startGetDataRpcFn,
-backoff,
-streamObserverFactory,
-streamRegistry,
-logEveryNStreamFailures,
-getDataThrottleTimer,
-jobHeader,
-idGenerator,
-streamingRpcBatchLimit,
-sendKeyedGetDataRequests,
-processHeartbeatResponses);
-getDataStream.startStream();
-return getDataStream;
+return new GrpcGetDataStream(
+backendWorkerToken,
+startGetDataRpcFn,
+backoff,
+streamObserverFactory,
+streamRegistry,
+logEveryNStreamFailures,
+getDataThrottleTimer,
+jobHeader,
+idGenerator,
+streamingRpcBatchLimit,
+sendKeyedGetDataRequests,
+processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException shutdownException(QueuedBatch 
batch) {
+return new WindmillStreamShutdownException(
+"Stream was closed when attempting to send " + batch.requestsCount() + 
" requests.");
+  }
+
+  private static WindmillStreamShutdownException 
shutdownException(QueuedRequest request) {
+return new WindmillStreamShutdownException(
+"Cannot send request=[" + request + "] on closed stream.");
   }
 
   @Override
   protected synchronized void onNewStream() {
+if (hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830053973


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##
@@ -121,32 +129,43 @@ public static GrpcGetDataStream create(
   int streamingRpcBatchLimit,
   boolean sendKeyedGetDataRequests,
   Consumer> 
processHeartbeatResponses) {
-GrpcGetDataStream getDataStream =
-new GrpcGetDataStream(
-backendWorkerToken,
-startGetDataRpcFn,
-backoff,
-streamObserverFactory,
-streamRegistry,
-logEveryNStreamFailures,
-getDataThrottleTimer,
-jobHeader,
-idGenerator,
-streamingRpcBatchLimit,
-sendKeyedGetDataRequests,
-processHeartbeatResponses);
-getDataStream.startStream();
-return getDataStream;
+return new GrpcGetDataStream(
+backendWorkerToken,
+startGetDataRpcFn,
+backoff,
+streamObserverFactory,
+streamRegistry,
+logEveryNStreamFailures,
+getDataThrottleTimer,
+jobHeader,
+idGenerator,
+streamingRpcBatchLimit,
+sendKeyedGetDataRequests,
+processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException shutdownException(QueuedBatch 
batch) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830052780


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##
@@ -201,7 +198,7 @@ private void maybeSendRequestExtension(GetWorkBudget 
extension) {
   @Override
   protected synchronized void onNewStream() {
 workItemAssemblers.clear();
-if (!isShutdown()) {
+if (!hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830052130


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830042998


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+debugMetrics.recordSend();
+requestObserver.onNext(request);
+  } catch (StreamObserverCancelledException e) {
+if (hasReceivedShutdownSignal()) {
+  logger.debug("Stream was shutdown during send.", e);
+  return;
+}
+
+requestObserver.onError(e);
+  }
+}
+  }
+
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
+synchronized (shutdownLock) {
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
+  }
+}
+
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  if (hasReceivedShutdownSignal()) {
+break;
+  }
+  debugMetrics.recordStart();
+  streamClosed = false;
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown waiting to start.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",
+  getClass());
+  break;
+} catch (IOException ioe) {
   // Keep trying to create the stream.
 }
   }
 }
+
+// We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+// removed when closed.
+streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+try {
+  executor.execute(runnable);
+} catch (RejectedExecutionException e) {
+  logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+}
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+if (!clientClosed && debugMetrics.getLastSendTimeMs() < 
lastSendThresh

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830042375


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+debugMetrics.recordSend();
+requestObserver.onNext(request);
+  } catch (StreamObserverCancelledException e) {
+if (hasReceivedShutdownSignal()) {
+  logger.debug("Stream was shutdown during send.", e);
+  return;
+}
+
+requestObserver.onError(e);
+  }
+}
+  }
+
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
+synchronized (shutdownLock) {
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
+  }
+}
+
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  if (hasReceivedShutdownSignal()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830041423


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1829911016


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+debugMetrics.recordSend();
+requestObserver.onNext(request);
+  } catch (StreamObserverCancelledException e) {
+if (hasReceivedShutdownSignal()) {
+  logger.debug("Stream was shutdown during send.", e);
+  return;
+}
+
+requestObserver.onError(e);
+  }
+}
+  }
+
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
+synchronized (shutdownLock) {
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
+  }
+}
+
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  if (hasReceivedShutdownSignal()) {
+break;
+  }
+  debugMetrics.recordStart();
+  streamClosed = false;
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown waiting to start.", e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-11-05 Thread via GitHub


scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1829093210


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");

Review Comment:
   I would remove this since it seems likely expensive.
   Instead you could verify with a test:
   - setup requestObserver that blocks until notified
   - one thread calls send and starts blocking
   - main test thread calls shutdown() and verifies the method returns
   - main test thread unblocks the requestObserver
   



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+synchronized (shutdownLock) {
+  return isShutdown;
 }
-
-return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (hasReceivedShutdownSignal()) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+debugMetrics.recordSend();
+requestObserver.onNext(request);
+  } catch (StreamObserverCancelledException e) {
+if (hasReceivedShutdownSignal()) {
+  logger.debug("Stream was shutdown during send.", e);
+  return;
+}
+
+requestObserver.onError(e);
+  }
+}
+  }
+
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
+synchronized (shutdownLock) {
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
+  }
+}
+
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  if (hasReceivedShutdownSignal()) {

Review Comment:
   this seems racy since shutdown coudl happen right after, the requestObserver 
poisoning already handles it below.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
star

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on PR #32774:
URL: https://github.com/apache/beam/pull/32774#issuecomment-2451099981

   back to you @scwhittle thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824900354


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import com.google.auto.value.AutoValue;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+/** Records stream metrics for debugging. */
+@ThreadSafe
+final class StreamDebugMetrics {
+  private final AtomicInteger restartCount = new AtomicInteger();
+  private final AtomicInteger errorCount = new AtomicInteger();
+
+  @GuardedBy("this")
+  private long sleepUntil = 0;
+
+  @GuardedBy("this")
+  private String lastRestartReason = "";
+
+  @GuardedBy("this")
+  private DateTime lastRestartTime = null;
+
+  @GuardedBy("this")
+  private long lastResponseTimeMs = 0;
+
+  @GuardedBy("this")
+  private long lastSendTimeMs = 0;
+
+  @GuardedBy("this")
+  private long startTimeMs = 0;
+
+  @GuardedBy("this")
+  private DateTime shutdownTime = null;
+
+  private static long debugDuration(long nowMs, long startMs) {
+return startMs <= 0 ? -1 : Math.max(0, nowMs - startMs);
+  }
+
+  private static long nowMs() {
+return Instant.now().getMillis();
+  }
+
+  synchronized void recordSend() {
+lastSendTimeMs = nowMs();
+  }
+
+  synchronized void recordStart() {
+startTimeMs = nowMs();
+lastResponseTimeMs = 0;
+  }
+
+  synchronized void recordResponse() {
+lastResponseTimeMs = nowMs();
+  }
+
+  synchronized void recordRestartReason(String error) {
+lastRestartReason = error;
+lastRestartTime = DateTime.now();
+  }
+
+  synchronized long startTimeMs() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824930724


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##
@@ -204,56 +224,64 @@ private void flushInternal(Map 
requests) {
 }
   }
 
-  private void issueSingleRequest(final long id, PendingRequest 
pendingRequest) {
+  private void issueSingleRequest(long id, PendingRequest pendingRequest) {
+if (!prepareForSend(id, pendingRequest)) {
+  pendingRequest.abort();
+  return;
+}
+
 StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
 requestBuilder
 .addCommitChunkBuilder()
-.setComputationId(pendingRequest.computation)
+.setComputationId(pendingRequest.computationId())
 .setRequestId(id)
-.setShardingKey(pendingRequest.request.getShardingKey())
-.setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+.setShardingKey(pendingRequest.shardingKey())
+.setSerializedWorkItemCommit(pendingRequest.serializedCommit());
 StreamingCommitWorkRequest chunk = requestBuilder.build();
-synchronized (this) {
-  pending.put(id, pendingRequest);
-  try {
-send(chunk);
-  } catch (IllegalStateException e) {
-// Stream was broken, request will be retried when stream is reopened.
-  }
+try {
+  send(chunk);
+} catch (IllegalStateException e) {
+  // Stream was broken, request will be retried when stream is reopened.
 }
   }
 
   private void issueBatchedRequest(Map requests) {
+if (!prepareForSend(requests)) {
+  requests.forEach((ignored, pendingRequest) -> pendingRequest.abort());
+  return;
+}
+
 StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
 String lastComputation = null;
 for (Map.Entry entry : requests.entrySet()) {
   PendingRequest request = entry.getValue();
   StreamingCommitRequestChunk.Builder chunkBuilder = 
requestBuilder.addCommitChunkBuilder();
-  if (lastComputation == null || 
!lastComputation.equals(request.computation)) {
-chunkBuilder.setComputationId(request.computation);
-lastComputation = request.computation;
+  if (lastComputation == null || 
!lastComputation.equals(request.computationId())) {
+chunkBuilder.setComputationId(request.computationId());
+lastComputation = request.computationId();
   }
-  chunkBuilder.setRequestId(entry.getKey());
-  chunkBuilder.setShardingKey(request.request.getShardingKey());
-  chunkBuilder.setSerializedWorkItemCommit(request.request.toByteString());
+  chunkBuilder
+  .setRequestId(entry.getKey())
+  .setShardingKey(request.shardingKey())
+  .setSerializedWorkItemCommit(request.serializedCommit());
 }
 StreamingCommitWorkRequest request = requestBuilder.build();
-synchronized (this) {
-  pending.putAll(requests);
-  try {
-send(request);
-  } catch (IllegalStateException e) {
-// Stream was broken, request will be retried when stream is reopened.
-  }
+try {
+  send(request);
+} catch (IllegalStateException e) {
+  // Stream was broken, request will be retried when stream is reopened.
 }
   }
 
-  private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-checkNotNull(pendingRequest.computation);
-final ByteString serializedCommit = pendingRequest.request.toByteString();
+  private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) {
+if (!prepareForSend(id, pendingRequest)) {
+  pendingRequest.abort();
+  return;
+}
 
+checkNotNull(pendingRequest.computationId(), "Cannot commit WorkItem w/o a 
computationId.");
+ByteString serializedCommit = pendingRequest.serializedCommit();
 synchronized (this) {
-  pending.put(id, pendingRequest);
   for (int i = 0;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824914980


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long 
startMs) {
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
   protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+return isShutdown;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (isShutdown) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
+  } catch (StreamObserverCancelledException e) {
+if (isShutdown) {
+  logger.debug("Stream was shutdown during send.", e);
+  return;
+}
+
+requestObserver.onError(e);
+  }
+}
+  }
+
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
+synchronized (shutdownLock) {
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
+  }
+}
+
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  if (isShutdown) {
+break;
+  }
+  debugMetrics.recordStart();
+  streamClosed = false;
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown waiting to start.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",
+  getClass());
+  break;
+} catch (IOException ioe) {
   // Keep trying to create the stream.
 }
   }
 }
+
+// We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+// removed when closed.
+streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+try {
+  executor.execute(runnable);
+} catch (RejectedExecutionException e) {
+  logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+}
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+if (!clientClosed && debugMetrics.lastSendTimeMs() < 
lastSendThreshold.getMillis()) {
   try {
 sendHealthCheck();
   } catch (RuntimeException e) {
-LOG.debug("Received exception sending health check.", e);
+logger.debug("Received exception sending health check.", e);
   }
 }
   }
 
   protected abstract void sendH

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824906796


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -49,46 +44,64 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * send and startStream should not be called from onResponse; use 
executor() instead.
+ * {@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * {@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
-  private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  // Indicates that the logical stream has been half-closed and is waiting for 
clean server
+  // shutdown.
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+  protected final Sleeper sleeper;
+
+  /**
+   * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+   *
+   * @implNote Do not hold when performing IO. If also locking on {@code this} 
in the same context,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824913419


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long 
startMs) {
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
   protected boolean isShutdown() {
-return isShutdown.get();
-  }
-
-  private StreamObserver requestObserver() {
-if (requestObserver == null) {
-  throw new NullPointerException(
-  "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-}
-
-return requestObserver;
+return isShutdown;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-lastSendTimeMs.set(Instant.now().getMillis());
 synchronized (this) {
-  if (streamClosed.get()) {
+  if (isShutdown) {
+return;
+  }
+
+  if (streamClosed) {
+// TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
 throw new IllegalStateException("Send called on a client closed 
stream.");
   }
 
-  requestObserver().onNext(request);
+  try {
+debugMetrics.recordSend();
+requestObserver.onNext(request);
+  } catch (StreamObserverCancelledException e) {
+if (isShutdown) {
+  logger.debug("Stream was shutdown during send.", e);
+  return;
+}
+
+requestObserver.onError(e);
+  }
+}
+  }
+
+  @Override
+  public final void start() {
+boolean shouldStartStream = false;
+synchronized (shutdownLock) {
+  if (!isShutdown && !started) {
+started = true;
+shouldStartStream = true;
+  }
+}
+
+if (shouldStartStream) {
+  startStream();
 }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
 // Add the stream to the registry after it has been fully constructed.
 streamRegistry.add(this);
 while (true) {
   try {
 synchronized (this) {
-  startTimeMs.set(Instant.now().getMillis());
-  lastResponseTimeMs.set(0);
-  streamClosed.set(false);
-  // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-  requestObserver = requestObserverSupplier.get();
+  if (isShutdown) {
+break;
+  }
+  debugMetrics.recordStart();
+  streamClosed = false;
+  requestObserver.reset();
   onNewStream();
-  if (clientClosed.get()) {
+  if (clientClosed) {
 halfClose();
   }
   return;
 }
+  } catch (WindmillStreamShutdownException e) {
+logger.debug("Stream was shutdown waiting to start.", e);
   } catch (Exception e) {
-LOG.error("Failed to create new stream, retrying: ", e);
+logger.error("Failed to create new stream, retrying: ", e);
 try {
   long sleep = backoff.nextBackOffMillis();
-  sleepUntil.set(Instant.now().getMillis() + sleep);
-  Thread.sleep(sleep);
-} catch (InterruptedException | IOException i) {
+  debugMetrics.recordSleep(sleep);
+  sleeper.sleep(sleep);
+} catch (InterruptedException ie) {
+  Thread.currentThread().interrupt();
+  logger.info(
+  "Interrupted during {} creation backoff. The stream will not be 
created.",
+  getClass());
+  break;
+} catch (IOException ioe) {
   // Keep trying to create the stream.
 }
   }
 }
+
+// We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+// removed when closed.
+streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+try {
+  executor.execute(runnable);
+} catch (RejectedExecutionException e) {
+  logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+}
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+if (!clientClosed && debugMetrics.lastSendTimeMs() < 
lastSendThreshold.getMillis()) {
   try {
 sendHealthCheck();
   } catch (RuntimeException e) {
-LOG.debug("Received exception sending health check.", e);
+logger.debug("Received exception sending health check.", e);
   }
 }
   }
 
   protected abstract void sendH

Re: [PR] add shutdown and start mechanics to windmill streams [beam]

2024-10-31 Thread via GitHub


m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824910797


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##
@@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long 
startMs) {
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
   protected boolean isShutdown() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >