Re: [PR] add a way for channels to be closed manually [beam]

2024-03-27 Thread via GitHub


kennknowles commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2022873151

   I quickly checked and the missing class has been in place for 10 years so 
you can ignore the second bullet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-27 Thread via GitHub


kennknowles commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2022870744

   > @Abacn @kennknowles It looks like it cannot find a dependency i added
   > 
   > i only added it this gradle file 
https://github.com/apache/beam/pull/30425/files#diff-c639d6f102d06d6d74d4d5c0d829d517ca720bf0b95708d10c86cbd4b847c65d
   > 
   > do i need to add it else where?
   
   Just to clarify: the library itself was successfully downloaded. Your 
dependency looks about right - but you should put it in the central list of 
dependencies in `BeamModulePlugin` like all the others, so we coordinate across 
modules.
   
Common sources of this problem that I can think of:
   
   - Someone used a `provided` or `compileOnly` scope dependency somewhere in 
the transitive chain, so it didn't put the thing on the classpath right (not 
your fault, but needs to be solved by turning it into an `implementation` 
dependency
   - Another module depends on the same library but at an earlier version, so 
that particular class is not found because the earlier version got precedence 
(the rules for precedence are "closer to the root of the transitive dependency 
chain")


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-20 Thread via GitHub


m-trieu commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2011033112

   @Abacn @kennknowles It looks like it cannot find a dependency i added
   
   i only added it this gradle file 
https://github.com/apache/beam/pull/30425/files#diff-c639d6f102d06d6d74d4d5c0d829d517ca720bf0b95708d10c86cbd4b847c65d
   
   do i need to add it else where?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-20 Thread via GitHub


Abacn commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2010319013

   This now blocks Java open PRs, we should revert it then investigate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-20 Thread via GitHub


Abacn commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2010317923

   It is related: The push trigger for this PR is the first failed one: 
https://github.com/apache/beam/actions/runs/8345017925
   
   
![image](https://github.com/apache/beam/assets/8010435/81593bbd-3ade-498e-a321-2b642ff4769c)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-20 Thread via GitHub


kennknowles commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2009858870

   On #30545 I'm seeing this failure crashlooping the streaming wordcount 
integration tests:
   
   ```
   java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: 
com/github/benmanes/caffeine/cache/RemovalCause
   2024-03-20 10:54:38.890 EDT
at 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache.create(ChannelCache.java:55)
   2024-03-20 10:54:38.891 EDT
at 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer.create(GrpcWindmillServer.java:164)
   2024-03-20 10:54:38.891 EDT
at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.createWindmillServerStub(StreamingDataflowWorker.java:625)
   2024-03-20 10:54:38.891 EDT
at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.fromOptions(StreamingDataflowWorker.java:466)
   2024-03-20 10:54:38.891 EDT
at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.main(StreamingDataflowWorker.java:577)
   ```
   
   Seems probably related?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-19 Thread via GitHub


scwhittle merged PR #30425:
URL: https://github.com/apache/beam/pull/30425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-18 Thread via GitHub


scwhittle commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2004876597

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-18 Thread via GitHub


scwhittle commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2004876092

   > Task :runners:google-cloud-dataflow-java:worker:test
   
   
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannelTest
 > testAwaitTermination FAILED
   org.mockito.exceptions.verification.TooFewActualInvocations at 
IsolationChannelTest.java:401
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-18 Thread via GitHub


m-trieu commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-2004659417

   Run Java Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-14 Thread via GitHub


m-trieu commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-1998921403

   @scwhittle ready for another look!
   We can switch all of the existing streaming caches to caffeine cache in a 
later pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1521880582


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -468,10 +472,18 @@ private StreamingDataflowWorker(
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
 ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
 long clientId = clientIdGenerator.nextLong();
+ChannelCache channelCache =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1521004735


##
runners/google-cloud-dataflow-java/worker/build.gradle:
##
@@ -71,6 +71,10 @@ def excluded_dependencies = [
 library.java.truth   // Test only
 ]
 
+// For Java8+ and less than Java11, use versions 2.x.x.
+// For Java11+ use versions 3.x.x per https://github.com/ben-manes/caffeine.
+def caffeine_cache_version = project.javaVersion < "11" ? "2.9.3" : "3.1.8"

Review Comment:
   removed the branch, defaulting to the lower version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1521002708


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChannelCacheTest {
+
+  private ChannelCache cache;
+
+  private static ChannelCache newCache(
+  Function channelFactory) {
+return ChannelCache.forTesting(channelFactory, () -> {});
+  }
+
+  @After
+  public void cleanUp() {
+if (cache != null) {
+  cache.clear();
+}
+  }
+
+  private ManagedChannel newChannel(String channelName) {
+return WindmillChannelFactory.inProcessChannel(channelName);
+  }
+
+  @Test
+  public void testLoadingCacheReturnsExistingChannel() {
+String channelName = "existingChannel";
+ManagedChannel channel = newChannel(channelName);
+Function channelFactory =
+spy(
+new Function() {
+  @Override
+  public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
+return channel;
+  }
+});
+
+cache = newCache(channelFactory);
+WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+// Initial call to load the cache.
+cache.get(someAddress);
+
+ManagedChannel cachedChannel = cache.get(someAddress);
+assertSame(channel, cachedChannel);
+verify(channelFactory, times(1)).apply(eq(someAddress));
+  }
+
+  @Test
+  public void testLoadingCacheReturnsLoadsChannelWhenNotPresent() {
+String channelName = "existingChannel";
+ManagedChannel channel = newChannel(channelName);
+Function channelFactory =
+spy(
+new Function() {
+  @Override
+  public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
+return channel;
+  }
+});
+
+cache = newCache(channelFactory);
+WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+ManagedChannel cachedChannel = cache.get(someAddress);
+assertSame(channel, cachedChannel);
+verify(channelFactory, times(1)).apply(eq(someAddress));
+  }
+
+  @Test
+  public void testRemoveAndClose() throws InterruptedException {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520987439


##
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChannelCacheTest {
+
+  private ChannelCache cache;
+
+  private static ChannelCache newCache(
+  Function channelFactory) {
+return ChannelCache.forTesting(channelFactory, () -> {});
+  }
+
+  @After
+  public void cleanUp() {
+if (cache != null) {
+  cache.clear();
+}
+  }
+
+  private ManagedChannel newChannel(String channelName) {
+return WindmillChannelFactory.inProcessChannel(channelName);
+  }
+
+  @Test
+  public void testLoadingCacheReturnsExistingChannel() {
+String channelName = "existingChannel";
+ManagedChannel channel = newChannel(channelName);
+Function channelFactory =
+spy(
+new Function() {
+  @Override
+  public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
+return channel;
+  }
+});
+
+cache = newCache(channelFactory);
+WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+// Initial call to load the cache.
+cache.get(someAddress);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520986277


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java:
##
@@ -44,7 +44,7 @@
  * that each active rpc has its own channel.
  */
 @Internal
-class IsolationChannel extends ManagedChannel {
+public class IsolationChannel extends ManagedChannel {

Review Comment:
   yea its used in GrpcWindmillServer and will be used in StreamingEngineClient 
when we eventually inject it.
   
   We could create a Factory class to create the ChannelCache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520983499


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+
+public interface ChannelCachingStubFactory extends WindmillStubFactory {
+
+  /**
+   * Remove and close the gRPC channel used to communicate with the given 
{@link
+   * WindmillServiceAddress}.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520977636


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  private ChannelCache(
+  Function channelFactory,
+  RemovalListener 
onChannelRemoved) {
+this.channelCache =
+
Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply);
+  }
+
+  public static ChannelCache create(
+  Function channelFactory) {
+return new ChannelCache(
+channelFactory,
+// Shutdown the channels as they get removed from the cache, so they 
do not leak.
+(address, channel, cause) -> shutdownChannel(channel));
+  }
+
+  @VisibleForTesting
+  static ChannelCache forTesting(
+  Function channelFactory, 
Runnable onChannelShutdown) {
+return new ChannelCache(
+channelFactory,
+// Shutdown the channels as they get removed from the cache, so they 
do not leak.
+// Add hook for testing so that we don't have to sleep/wait for 
arbitrary time in test.
+(address, channel, cause) -> {
+  shutdownChannel(channel);
+  onChannelShutdown.run();
+});
+  }
+
+  private static void shutdownChannel(ManagedChannel channel) {
+channel.shutdown();
+try {
+  channel.awaitTermination(10, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+  LOG.error("Couldn't close gRPC channel={}", channel, e);
+}
+channel.shutdownNow();
+  }
+
+  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
+return channelCache.get(windmillServiceAddress);
+  }
+
+  public void remove(WindmillServiceAddress windmillServiceAddress) {
+channelCache.invalidate(windmillServiceAddress);
+  }
+
+  public void clear() {
+channelCache.invalidateAll();
+  }
+
+  @VisibleForTesting
+  boolean isEmpty() {
+// Perform any pending removal/insert operations first.
+channelCache.cleanUp();

Review Comment:
   yes this does block added comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520973635


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  private ChannelCache(
+  Function channelFactory,
+  RemovalListener 
onChannelRemoved) {
+this.channelCache =
+
Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply);

Review Comment:
   https://github.com/ben-manes/caffeine/wiki/Removal
   looks like this is done async by default.
   
   we can pass in our own executor if we want but will add a test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-12 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520939553


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;

Review Comment:
   
https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder.html
 looks like caffeine is preffered



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-11 Thread via GitHub


Abacn commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520404949


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;

Review Comment:
   Would guava's cache satisfy the use case here? That is 
`com.google.common.cache` and is used by SDK harness: 
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-11 Thread via GitHub


Abacn commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520401764


##
runners/google-cloud-dataflow-java/worker/build.gradle:
##
@@ -71,6 +71,10 @@ def excluded_dependencies = [
 library.java.truth   // Test only
 ]
 
+// For Java8+ and less than Java11, use versions 2.x.x.
+// For Java11+ use versions 3.x.x per https://github.com/ben-manes/caffeine.
+def caffeine_cache_version = project.javaVersion < "11" ? "2.9.3" : "3.1.8"

Review Comment:
   This won't work for release. build.gradle is just a build script and 
executed when build the artifact. The version string is resolved during compile 
time, so the released dataflow-worker will always get 2.9.3 (because the 
artifacts were compiled using single java version - java8 then publish to maven)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-11 Thread via GitHub


Abacn commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520401764


##
runners/google-cloud-dataflow-java/worker/build.gradle:
##
@@ -71,6 +71,10 @@ def excluded_dependencies = [
 library.java.truth   // Test only
 ]
 
+// For Java8+ and less than Java11, use versions 2.x.x.
+// For Java11+ use versions 3.x.x per https://github.com/ben-manes/caffeine.
+def caffeine_cache_version = project.javaVersion < "11" ? "2.9.3" : "3.1.8"

Review Comment:
   This won't work for release. The version string is resolved during compile 
time, so the released dataflow-worker will always get 2.9.3 (because the 
artifacts were compiled using single java version - java8 then publish to maven)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-11 Thread via GitHub


scwhittle commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520151250


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  private ChannelCache(
+  Function channelFactory,
+  RemovalListener 
onChannelRemoved) {
+this.channelCache =
+
Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply);
+  }
+
+  public static ChannelCache create(
+  Function channelFactory) {
+return new ChannelCache(
+channelFactory,
+// Shutdown the channels as they get removed from the cache, so they 
do not leak.
+(address, channel, cause) -> shutdownChannel(channel));
+  }
+
+  @VisibleForTesting
+  static ChannelCache forTesting(
+  Function channelFactory, 
Runnable onChannelShutdown) {
+return new ChannelCache(
+channelFactory,
+// Shutdown the channels as they get removed from the cache, so they 
do not leak.
+// Add hook for testing so that we don't have to sleep/wait for 
arbitrary time in test.
+(address, channel, cause) -> {
+  shutdownChannel(channel);
+  onChannelShutdown.run();
+});
+  }
+
+  private static void shutdownChannel(ManagedChannel channel) {
+channel.shutdown();
+try {
+  channel.awaitTermination(10, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+  LOG.error("Couldn't close gRPC channel={}", channel, e);
+}
+channel.shutdownNow();
+  }
+
+  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
+return channelCache.get(windmillServiceAddress);
+  }
+
+  public void remove(WindmillServiceAddress windmillServiceAddress) {
+channelCache.invalidate(windmillServiceAddress);
+  }
+
+  public void clear() {
+channelCache.invalidateAll();
+  }
+
+  @VisibleForTesting
+  boolean isEmpty() {
+// Perform any pending removal/insert operations first.
+channelCache.cleanUp();

Review Comment:
   does this block for removal listeners if they are on executor? add comment



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in 

Re: [PR] add a way for channels to be closed manually [beam]

2024-03-08 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1518234301


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##
@@ -102,7 +105,8 @@ private StreamingEngineClient(
   WindmillStubFactory stubFactory,
   GetWorkBudgetDistributor getWorkBudgetDistributor,
   GrpcDispatcherClient dispatcherClient,
-  long clientId) {
+  long clientId,
+  ChannelCache channelCache) {

Review Comment:
   done
   Added another interface that extends WindmillStubFactory to expose some of 
the caching behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-08 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1518206137


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -468,10 +472,18 @@ private StreamingDataflowWorker(
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
 ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
 long clientId = clientIdGenerator.nextLong();
+ChannelCache channelCache =

Review Comment:
   Do we want to share the StubFactory between GrpcDispatcherClient and 
StreamingEngineClient? If yes it might be a good idea to create it externally 
and inject it into both objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-08 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1518205171


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  private ChannelCache(
+  boolean useIsolatedChannels,
+  Function channelFactory,
+  RemovalListener 
onChannelRemoved) {
+this.channelCache =
+Caffeine.newBuilder()
+.removalListener(onChannelRemoved)
+.build(
+serviceAddress ->
+// IsolationChannel will create and manage separate RPC 
channels to the same
+// serviceAddress via calling the channelFactory, else 
just directly return the
+// RPC channel.
+useIsolatedChannels

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-08 Thread via GitHub


scwhittle commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1517788866


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##
@@ -102,7 +105,8 @@ private StreamingEngineClient(
   WindmillStubFactory stubFactory,
   GetWorkBudgetDistributor getWorkBudgetDistributor,
   GrpcDispatcherClient dispatcherClient,
-  long clientId) {
+  long clientId,
+  ChannelCache channelCache) {

Review Comment:
   It seems like the channelcache is overlapping with the stubfactory.
   Maybe it would be better to merge them by taking in a 
ChannelCachingRemoteStubFactory?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -468,10 +472,18 @@ private StreamingDataflowWorker(
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
 ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
 long clientId = clientIdGenerator.nextLong();
+ChannelCache channelCache =

Review Comment:
   can we move the channel cache usage to 
StreamingEngineClient/GrpcWindmillServer and below?
   
   Seems like unnecessary to plumb from up here and given that we are 
coordinating invalidation at that level it doesn't seem like we want to expose 
it here.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  private ChannelCache(
+  boolean useIsolatedChannels,
+  Function channelFactory,
+  RemovalListener 
onChannelRemoved) {
+this.channelCache =
+Caffeine.newBuilder()
+.removalListener(onChannelRemoved)
+.build(
+serviceAddress ->
+// IsolationChannel will create and manage separate RPC 
channels to the same
+// serviceAddress via calling the channelFactory, else 
just directly return the
+// RPC channel.
+useIsolatedChannels

Review Comment:
   it doesn't seem like IsolationChannel needs to be part of the cache
   callers can just pass in a channelFactory that vends IsolationChannels if 
desired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-07 Thread via GitHub


m-trieu commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-1984164547

   @Abacn looks like precommit is failing due to 
`validateShadedJarDoesntLeakNonProjectClasses` how can I fix this when adding a 
new dependency? It is for the Caffeine cache. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-07 Thread via GitHub


scwhittle commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1515830736


##
runners/google-cloud-dataflow-java/worker/build.gradle:
##
@@ -137,6 +137,14 @@ applyJavaNature(
 relocate("org.eclipse.jetty", 
getWorkerRelocatedPath("org.eclipse.jetty"))
 relocate("javax.servlet", getWorkerRelocatedPath("javax.servlet"))
 
+// Use Caffeine cache instead of Guava cache.
+// Context: 
https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder
+// For Java8+ and less than Java11, use versions 2.x.x per 
https://github.com/ben-manes/caffeine.

Review Comment:
   We probably want the version to be conditional on the java version being 
used then.
   
   @Abacn Could you provide some expertise here? Does this require vendoring if 
it's just used within the worker? Any other gotchas about adding a dependency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-06 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1515691626


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##
@@ -97,6 +97,14 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
 : randomlySelectNextStub(windmillServiceStubs));
   }
 
+  WindmillServiceAddress getWindmillServiceAddress() {
+ImmutableList endpoints =
+ImmutableList.copyOf(dispatcherStubs.get().dispatcherEndpoints());

Review Comment:
   realized we don't need this since the cache handles all of it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-06 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1515690054


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  public ChannelCache(
+  boolean useIsolatedChannels,
+  Function channelFactory) {
+this.channelCache =
+CacheBuilder.newBuilder()
+.build(
+new CacheLoader() {
+  @Override
+  public ManagedChannel load(WindmillServiceAddress 
serviceAddress) {
+// IsolationChannel will create and manage separate RPC 
channels to the same
+// serviceAddress via calling the channelFactory, else 
just directly return the
+// RPC channel.
+return useIsolatedChannels
+? IsolationChannel.create(() -> 
channelFactory.apply(serviceAddress))
+: channelFactory.apply(serviceAddress);
+  }
+});
+  }
+
+  private static void shutdownChannel(ManagedChannel channel) {
+channel.shutdown();
+try {
+  channel.awaitTermination(10, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+  LOG.error("Couldn't close gRPC channel={}", channel, e);
+}
+channel.shutdownNow();
+  }
+
+  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
+return channelCache.getUnchecked(windmillServiceAddress);
+  }
+
+  public void removeAndClose(WindmillServiceAddress windmillServiceAddress) {
+Optional.ofNullable(channelCache.getIfPresent(windmillServiceAddress))
+.ifPresent(ChannelCache::shutdownChannel);

Review Comment:
   added the caffeine cache to worker build file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-06 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1515523277


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  public ChannelCache(
+  boolean useIsolatedChannels,
+  Function channelFactory) {
+this.channelCache =
+CacheBuilder.newBuilder()
+.build(
+new CacheLoader() {
+  @Override
+  public ManagedChannel load(WindmillServiceAddress 
serviceAddress) {
+// IsolationChannel will create and manage separate RPC 
channels to the same
+// serviceAddress via calling the channelFactory, else 
just directly return the
+// RPC channel.
+return useIsolatedChannels
+? IsolationChannel.create(() -> 
channelFactory.apply(serviceAddress))
+: channelFactory.apply(serviceAddress);
+  }
+});
+  }
+
+  private static void shutdownChannel(ManagedChannel channel) {
+channel.shutdown();
+try {
+  channel.awaitTermination(10, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+  LOG.error("Couldn't close gRPC channel={}", channel, e);
+}
+channel.shutdownNow();
+  }
+
+  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
+return channelCache.getUnchecked(windmillServiceAddress);
+  }
+
+  public void removeAndClose(WindmillServiceAddress windmillServiceAddress) {
+Optional.ofNullable(channelCache.getIfPresent(windmillServiceAddress))
+.ifPresent(ChannelCache::shutdownChannel);

Review Comment:
   sgtm added a removal listener to the current implementation.
   
   If we sub guava cache with caffeine cache, do we have to go through 
vendoring? or any kind of dependency review?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-06 Thread via GitHub


scwhittle commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1514388493


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##
@@ -97,6 +97,14 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
 : randomlySelectNextStub(windmillServiceStubs));
   }
 
+  WindmillServiceAddress getWindmillServiceAddress() {
+ImmutableList endpoints =
+ImmutableList.copyOf(dispatcherStubs.get().dispatcherEndpoints());

Review Comment:
   just use getDispatcherEndpoints?
   it doesn't seem like we need to copy we just need to make sure we use a 
consistent set by keeping the reference to it.



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache 
channelCache;
+
+  public ChannelCache(
+  boolean useIsolatedChannels,
+  Function channelFactory) {
+this.channelCache =
+CacheBuilder.newBuilder()
+.build(
+new CacheLoader() {
+  @Override
+  public ManagedChannel load(WindmillServiceAddress 
serviceAddress) {
+// IsolationChannel will create and manage separate RPC 
channels to the same
+// serviceAddress via calling the channelFactory, else 
just directly return the
+// RPC channel.
+return useIsolatedChannels
+? IsolationChannel.create(() -> 
channelFactory.apply(serviceAddress))
+: channelFactory.apply(serviceAddress);
+  }
+});
+  }
+
+  private static void shutdownChannel(ManagedChannel channel) {
+channel.shutdown();
+try {
+  channel.awaitTermination(10, TimeUnit.SECONDS);
+} catch (InterruptedException e) {
+  LOG.error("Couldn't close gRPC channel={}", channel, e);
+}
+channel.shutdownNow();
+  }
+
+  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
+return channelCache.getUnchecked(windmillServiceAddress);
+  }
+
+  public void removeAndClose(WindmillServiceAddress windmillServiceAddress) {
+Optional.ofNullable(channelCache.getIfPresent(windmillServiceAddress))
+.ifPresent(ChannelCache::shutdownChannel);

Review Comment:
   it would be nice if this didn't block the caller but happened in the 
background as it is just to help clean up. Otherwise if we get some metadata 
update that triggers removing X windmill worker endpoints we could delay up to 
10*X seconds.
   
   It looks like Caffeine is recommended over guava cachebuilder from [javadoc](
   
https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder.html)
   
 

Re: [PR] add a way for channels to be closed manually [beam]

2024-03-05 Thread via GitHub


m-trieu commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-1979741093

   back to you @scwhittle thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-05 Thread via GitHub


m-trieu commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1513573432


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##
@@ -250,6 +257,22 @@ public GetWorkBudget remainingBudget() {
 .apply(currentInflightBudget);
   }
 
+  @Override
+  public synchronized void close() {
+super.close();
+if (!rpcChannel.isShutdown()) {

Review Comment:
   yea this is what was i was initially thinking but opted to use the cache and 
handle the closing of the channels in `StreamingEngineClient`.
   
   I think the channelCache encapsulates the logic well and makes it more 
testable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-03-04 Thread via GitHub


scwhittle commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1510885621


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##
@@ -250,6 +257,22 @@ public GetWorkBudget remainingBudget() {
 .apply(currentInflightBudget);
   }
 
+  @Override
+  public synchronized void close() {
+super.close();
+if (!rpcChannel.isShutdown()) {

Review Comment:
   it seems like this could shutdown a channel that is still in the cache?
   It seems like the cache should be responsible for shutting down channels 
that it is sure that are not being used anymore.
   
   If we need to track who is using the endpoint, do we need the cache? It 
seems like instead we could have the StreamingEngineClient cache the channels 
internally and shut them down when the worker endpoints change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-02-29 Thread via GitHub


github-actions[bot] commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-1972055861

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-02-29 Thread via GitHub


m-trieu commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-1972054383

   R: @scwhittle 
   
   This is for reuse of channels.  This is more important during direct path 
since we will close streams (and their backing channels pointing to the backend 
workers) when we receive new metadata.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add a way for channels to be closed manually [beam]

2024-02-26 Thread via GitHub


github-actions[bot] commented on PR #30425:
URL: https://github.com/apache/beam/pull/30425#issuecomment-1965332725

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org