Re: [PR] add a way for channels to be closed manually [beam]
kennknowles commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2022873151 I quickly checked and the missing class has been in place for 10 years so you can ignore the second bullet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
kennknowles commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2022870744 > @Abacn @kennknowles It looks like it cannot find a dependency i added > > i only added it this gradle file https://github.com/apache/beam/pull/30425/files#diff-c639d6f102d06d6d74d4d5c0d829d517ca720bf0b95708d10c86cbd4b847c65d > > do i need to add it else where? Just to clarify: the library itself was successfully downloaded. Your dependency looks about right - but you should put it in the central list of dependencies in `BeamModulePlugin` like all the others, so we coordinate across modules. Common sources of this problem that I can think of: - Someone used a `provided` or `compileOnly` scope dependency somewhere in the transitive chain, so it didn't put the thing on the classpath right (not your fault, but needs to be solved by turning it into an `implementation` dependency - Another module depends on the same library but at an earlier version, so that particular class is not found because the earlier version got precedence (the rules for precedence are "closer to the root of the transitive dependency chain") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2011033112 @Abacn @kennknowles It looks like it cannot find a dependency i added i only added it this gradle file https://github.com/apache/beam/pull/30425/files#diff-c639d6f102d06d6d74d4d5c0d829d517ca720bf0b95708d10c86cbd4b847c65d do i need to add it else where? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
Abacn commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2010319013 This now blocks Java open PRs, we should revert it then investigate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
Abacn commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2010317923 It is related: The push trigger for this PR is the first failed one: https://github.com/apache/beam/actions/runs/8345017925 ![image](https://github.com/apache/beam/assets/8010435/81593bbd-3ade-498e-a321-2b642ff4769c) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
kennknowles commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2009858870 On #30545 I'm seeing this failure crashlooping the streaming wordcount integration tests: ``` java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/github/benmanes/caffeine/cache/RemovalCause 2024-03-20 10:54:38.890 EDT at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache.create(ChannelCache.java:55) 2024-03-20 10:54:38.891 EDT at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer.create(GrpcWindmillServer.java:164) 2024-03-20 10:54:38.891 EDT at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.createWindmillServerStub(StreamingDataflowWorker.java:625) 2024-03-20 10:54:38.891 EDT at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.fromOptions(StreamingDataflowWorker.java:466) 2024-03-20 10:54:38.891 EDT at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.main(StreamingDataflowWorker.java:577) ``` Seems probably related? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle merged PR #30425: URL: https://github.com/apache/beam/pull/30425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2004876597 Run Java PreCommit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2004876092 > Task :runners:google-cloud-dataflow-java:worker:test org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannelTest > testAwaitTermination FAILED org.mockito.exceptions.verification.TooFewActualInvocations at IsolationChannelTest.java:401 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-2004659417 Run Java Precommit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-1998921403 @scwhittle ready for another look! We can switch all of the existing streaming caches to caffeine cache in a later pr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1521880582 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -468,10 +472,18 @@ private StreamingDataflowWorker( public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { ConcurrentMap computationMap = new ConcurrentHashMap<>(); long clientId = clientIdGenerator.nextLong(); +ChannelCache channelCache = Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1521004735 ## runners/google-cloud-dataflow-java/worker/build.gradle: ## @@ -71,6 +71,10 @@ def excluded_dependencies = [ library.java.truth // Test only ] +// For Java8+ and less than Java11, use versions 2.x.x. +// For Java11+ use versions 3.x.x per https://github.com/ben-manes/caffeine. +def caffeine_cache_version = project.javaVersion < "11" ? "2.9.3" : "3.1.8" Review Comment: removed the branch, defaulting to the lower version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1521002708 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java: ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChannelCacheTest { + + private ChannelCache cache; + + private static ChannelCache newCache( + Function channelFactory) { +return ChannelCache.forTesting(channelFactory, () -> {}); + } + + @After + public void cleanUp() { +if (cache != null) { + cache.clear(); +} + } + + private ManagedChannel newChannel(String channelName) { +return WindmillChannelFactory.inProcessChannel(channelName); + } + + @Test + public void testLoadingCacheReturnsExistingChannel() { +String channelName = "existingChannel"; +ManagedChannel channel = newChannel(channelName); +Function channelFactory = +spy( +new Function() { + @Override + public ManagedChannel apply(WindmillServiceAddress windmillServiceAddress) { +return channel; + } +}); + +cache = newCache(channelFactory); +WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); +// Initial call to load the cache. +cache.get(someAddress); + +ManagedChannel cachedChannel = cache.get(someAddress); +assertSame(channel, cachedChannel); +verify(channelFactory, times(1)).apply(eq(someAddress)); + } + + @Test + public void testLoadingCacheReturnsLoadsChannelWhenNotPresent() { +String channelName = "existingChannel"; +ManagedChannel channel = newChannel(channelName); +Function channelFactory = +spy( +new Function() { + @Override + public ManagedChannel apply(WindmillServiceAddress windmillServiceAddress) { +return channel; + } +}); + +cache = newCache(channelFactory); +WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); +ManagedChannel cachedChannel = cache.get(someAddress); +assertSame(channel, cachedChannel); +verify(channelFactory, times(1)).apply(eq(someAddress)); + } + + @Test + public void testRemoveAndClose() throws InterruptedException { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520987439 ## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java: ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChannelCacheTest { + + private ChannelCache cache; + + private static ChannelCache newCache( + Function channelFactory) { +return ChannelCache.forTesting(channelFactory, () -> {}); + } + + @After + public void cleanUp() { +if (cache != null) { + cache.clear(); +} + } + + private ManagedChannel newChannel(String channelName) { +return WindmillChannelFactory.inProcessChannel(channelName); + } + + @Test + public void testLoadingCacheReturnsExistingChannel() { +String channelName = "existingChannel"; +ManagedChannel channel = newChannel(channelName); +Function channelFactory = +spy( +new Function() { + @Override + public ManagedChannel apply(WindmillServiceAddress windmillServiceAddress) { +return channel; + } +}); + +cache = newCache(channelFactory); +WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); +// Initial call to load the cache. +cache.get(someAddress); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520986277 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java: ## @@ -44,7 +44,7 @@ * that each active rpc has its own channel. */ @Internal -class IsolationChannel extends ManagedChannel { +public class IsolationChannel extends ManagedChannel { Review Comment: yea its used in GrpcWindmillServer and will be used in StreamingEngineClient when we eventually inject it. We could create a Factory class to create the ChannelCache? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520983499 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; + +public interface ChannelCachingStubFactory extends WindmillStubFactory { + + /** + * Remove and close the gRPC channel used to communicate with the given {@link + * WindmillServiceAddress}. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520977636 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + private ChannelCache( + Function channelFactory, + RemovalListener onChannelRemoved) { +this.channelCache = + Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply); + } + + public static ChannelCache create( + Function channelFactory) { +return new ChannelCache( +channelFactory, +// Shutdown the channels as they get removed from the cache, so they do not leak. +(address, channel, cause) -> shutdownChannel(channel)); + } + + @VisibleForTesting + static ChannelCache forTesting( + Function channelFactory, Runnable onChannelShutdown) { +return new ChannelCache( +channelFactory, +// Shutdown the channels as they get removed from the cache, so they do not leak. +// Add hook for testing so that we don't have to sleep/wait for arbitrary time in test. +(address, channel, cause) -> { + shutdownChannel(channel); + onChannelShutdown.run(); +}); + } + + private static void shutdownChannel(ManagedChannel channel) { +channel.shutdown(); +try { + channel.awaitTermination(10, TimeUnit.SECONDS); +} catch (InterruptedException e) { + LOG.error("Couldn't close gRPC channel={}", channel, e); +} +channel.shutdownNow(); + } + + public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { +return channelCache.get(windmillServiceAddress); + } + + public void remove(WindmillServiceAddress windmillServiceAddress) { +channelCache.invalidate(windmillServiceAddress); + } + + public void clear() { +channelCache.invalidateAll(); + } + + @VisibleForTesting + boolean isEmpty() { +// Perform any pending removal/insert operations first. +channelCache.cleanUp(); Review Comment: yes this does block added comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520973635 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + private ChannelCache( + Function channelFactory, + RemovalListener onChannelRemoved) { +this.channelCache = + Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply); Review Comment: https://github.com/ben-manes/caffeine/wiki/Removal looks like this is done async by default. we can pass in our own executor if we want but will add a test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520939553 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; Review Comment: https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder.html looks like caffeine is preffered -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
Abacn commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520404949 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; Review Comment: Would guava's cache satisfy the use case here? That is `com.google.common.cache` and is used by SDK harness: https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
Abacn commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520401764 ## runners/google-cloud-dataflow-java/worker/build.gradle: ## @@ -71,6 +71,10 @@ def excluded_dependencies = [ library.java.truth // Test only ] +// For Java8+ and less than Java11, use versions 2.x.x. +// For Java11+ use versions 3.x.x per https://github.com/ben-manes/caffeine. +def caffeine_cache_version = project.javaVersion < "11" ? "2.9.3" : "3.1.8" Review Comment: This won't work for release. build.gradle is just a build script and executed when build the artifact. The version string is resolved during compile time, so the released dataflow-worker will always get 2.9.3 (because the artifacts were compiled using single java version - java8 then publish to maven) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
Abacn commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520401764 ## runners/google-cloud-dataflow-java/worker/build.gradle: ## @@ -71,6 +71,10 @@ def excluded_dependencies = [ library.java.truth // Test only ] +// For Java8+ and less than Java11, use versions 2.x.x. +// For Java11+ use versions 3.x.x per https://github.com/ben-manes/caffeine. +def caffeine_cache_version = project.javaVersion < "11" ? "2.9.3" : "3.1.8" Review Comment: This won't work for release. The version string is resolved during compile time, so the released dataflow-worker will always get 2.9.3 (because the artifacts were compiled using single java version - java8 then publish to maven) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1520151250 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + private ChannelCache( + Function channelFactory, + RemovalListener onChannelRemoved) { +this.channelCache = + Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply); + } + + public static ChannelCache create( + Function channelFactory) { +return new ChannelCache( +channelFactory, +// Shutdown the channels as they get removed from the cache, so they do not leak. +(address, channel, cause) -> shutdownChannel(channel)); + } + + @VisibleForTesting + static ChannelCache forTesting( + Function channelFactory, Runnable onChannelShutdown) { +return new ChannelCache( +channelFactory, +// Shutdown the channels as they get removed from the cache, so they do not leak. +// Add hook for testing so that we don't have to sleep/wait for arbitrary time in test. +(address, channel, cause) -> { + shutdownChannel(channel); + onChannelShutdown.run(); +}); + } + + private static void shutdownChannel(ManagedChannel channel) { +channel.shutdown(); +try { + channel.awaitTermination(10, TimeUnit.SECONDS); +} catch (InterruptedException e) { + LOG.error("Couldn't close gRPC channel={}", channel, e); +} +channel.shutdownNow(); + } + + public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { +return channelCache.get(windmillServiceAddress); + } + + public void remove(WindmillServiceAddress windmillServiceAddress) { +channelCache.invalidate(windmillServiceAddress); + } + + public void clear() { +channelCache.invalidateAll(); + } + + @VisibleForTesting + boolean isEmpty() { +// Perform any pending removal/insert operations first. +channelCache.cleanUp(); Review Comment: does this block for removal listeners if they are on executor? add comment ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1518234301 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java: ## @@ -102,7 +105,8 @@ private StreamingEngineClient( WindmillStubFactory stubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, - long clientId) { + long clientId, + ChannelCache channelCache) { Review Comment: done Added another interface that extends WindmillStubFactory to expose some of the caching behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1518206137 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -468,10 +472,18 @@ private StreamingDataflowWorker( public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { ConcurrentMap computationMap = new ConcurrentHashMap<>(); long clientId = clientIdGenerator.nextLong(); +ChannelCache channelCache = Review Comment: Do we want to share the StubFactory between GrpcDispatcherClient and StreamingEngineClient? If yes it might be a good idea to create it externally and inject it into both objects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1518205171 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + private ChannelCache( + boolean useIsolatedChannels, + Function channelFactory, + RemovalListener onChannelRemoved) { +this.channelCache = +Caffeine.newBuilder() +.removalListener(onChannelRemoved) +.build( +serviceAddress -> +// IsolationChannel will create and manage separate RPC channels to the same +// serviceAddress via calling the channelFactory, else just directly return the +// RPC channel. +useIsolatedChannels Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1517788866 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java: ## @@ -102,7 +105,8 @@ private StreamingEngineClient( WindmillStubFactory stubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, - long clientId) { + long clientId, + ChannelCache channelCache) { Review Comment: It seems like the channelcache is overlapping with the stubfactory. Maybe it would be better to merge them by taking in a ChannelCachingRemoteStubFactory? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -468,10 +472,18 @@ private StreamingDataflowWorker( public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { ConcurrentMap computationMap = new ConcurrentHashMap<>(); long clientId = clientIdGenerator.nextLong(); +ChannelCache channelCache = Review Comment: can we move the channel cache usage to StreamingEngineClient/GrpcWindmillServer and below? Seems like unnecessary to plumb from up here and given that we are coordinating invalidation at that level it doesn't seem like we want to expose it here. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + private ChannelCache( + boolean useIsolatedChannels, + Function channelFactory, + RemovalListener onChannelRemoved) { +this.channelCache = +Caffeine.newBuilder() +.removalListener(onChannelRemoved) +.build( +serviceAddress -> +// IsolationChannel will create and manage separate RPC channels to the same +// serviceAddress via calling the channelFactory, else just directly return the +// RPC channel. +useIsolatedChannels Review Comment: it doesn't seem like IsolationChannel needs to be part of the cache callers can just pass in a channelFactory that vends IsolationChannels if desired. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-1984164547 @Abacn looks like precommit is failing due to `validateShadedJarDoesntLeakNonProjectClasses` how can I fix this when adding a new dependency? It is for the Caffeine cache. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1515830736 ## runners/google-cloud-dataflow-java/worker/build.gradle: ## @@ -137,6 +137,14 @@ applyJavaNature( relocate("org.eclipse.jetty", getWorkerRelocatedPath("org.eclipse.jetty")) relocate("javax.servlet", getWorkerRelocatedPath("javax.servlet")) +// Use Caffeine cache instead of Guava cache. +// Context: https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder +// For Java8+ and less than Java11, use versions 2.x.x per https://github.com/ben-manes/caffeine. Review Comment: We probably want the version to be conditional on the java version being used then. @Abacn Could you provide some expertise here? Does this require vendoring if it's just used within the worker? Any other gotchas about adding a dependency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1515691626 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java: ## @@ -97,6 +97,14 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() { : randomlySelectNextStub(windmillServiceStubs)); } + WindmillServiceAddress getWindmillServiceAddress() { +ImmutableList endpoints = +ImmutableList.copyOf(dispatcherStubs.get().dispatcherEndpoints()); Review Comment: realized we don't need this since the cache handles all of it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1515690054 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import java.io.PrintWriter; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + public ChannelCache( + boolean useIsolatedChannels, + Function channelFactory) { +this.channelCache = +CacheBuilder.newBuilder() +.build( +new CacheLoader() { + @Override + public ManagedChannel load(WindmillServiceAddress serviceAddress) { +// IsolationChannel will create and manage separate RPC channels to the same +// serviceAddress via calling the channelFactory, else just directly return the +// RPC channel. +return useIsolatedChannels +? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) +: channelFactory.apply(serviceAddress); + } +}); + } + + private static void shutdownChannel(ManagedChannel channel) { +channel.shutdown(); +try { + channel.awaitTermination(10, TimeUnit.SECONDS); +} catch (InterruptedException e) { + LOG.error("Couldn't close gRPC channel={}", channel, e); +} +channel.shutdownNow(); + } + + public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { +return channelCache.getUnchecked(windmillServiceAddress); + } + + public void removeAndClose(WindmillServiceAddress windmillServiceAddress) { +Optional.ofNullable(channelCache.getIfPresent(windmillServiceAddress)) +.ifPresent(ChannelCache::shutdownChannel); Review Comment: added the caffeine cache to worker build file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1515523277 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import java.io.PrintWriter; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + public ChannelCache( + boolean useIsolatedChannels, + Function channelFactory) { +this.channelCache = +CacheBuilder.newBuilder() +.build( +new CacheLoader() { + @Override + public ManagedChannel load(WindmillServiceAddress serviceAddress) { +// IsolationChannel will create and manage separate RPC channels to the same +// serviceAddress via calling the channelFactory, else just directly return the +// RPC channel. +return useIsolatedChannels +? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) +: channelFactory.apply(serviceAddress); + } +}); + } + + private static void shutdownChannel(ManagedChannel channel) { +channel.shutdown(); +try { + channel.awaitTermination(10, TimeUnit.SECONDS); +} catch (InterruptedException e) { + LOG.error("Couldn't close gRPC channel={}", channel, e); +} +channel.shutdownNow(); + } + + public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { +return channelCache.getUnchecked(windmillServiceAddress); + } + + public void removeAndClose(WindmillServiceAddress windmillServiceAddress) { +Optional.ofNullable(channelCache.getIfPresent(windmillServiceAddress)) +.ifPresent(ChannelCache::shutdownChannel); Review Comment: sgtm added a removal listener to the current implementation. If we sub guava cache with caffeine cache, do we have to go through vendoring? or any kind of dependency review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1514388493 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java: ## @@ -97,6 +97,14 @@ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() { : randomlySelectNextStub(windmillServiceStubs)); } + WindmillServiceAddress getWindmillServiceAddress() { +ImmutableList endpoints = +ImmutableList.copyOf(dispatcherStubs.get().dispatcherEndpoints()); Review Comment: just use getDispatcherEndpoints? it doesn't seem like we need to copy we just need to make sure we use a consistent set by keeping the reference to it. ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import java.io.PrintWriter; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows https://grpc.io/docs/guides/performance/#java>gRPC recommendations for re-using channels + * when possible. + * + * @implNote Backed by {@link LoadingCache} which is thread-safe. + */ +@ThreadSafe +public final class ChannelCache implements StatusDataProvider { + private static final Logger LOG = LoggerFactory.getLogger(ChannelCache.class); + private final LoadingCache channelCache; + + public ChannelCache( + boolean useIsolatedChannels, + Function channelFactory) { +this.channelCache = +CacheBuilder.newBuilder() +.build( +new CacheLoader() { + @Override + public ManagedChannel load(WindmillServiceAddress serviceAddress) { +// IsolationChannel will create and manage separate RPC channels to the same +// serviceAddress via calling the channelFactory, else just directly return the +// RPC channel. +return useIsolatedChannels +? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) +: channelFactory.apply(serviceAddress); + } +}); + } + + private static void shutdownChannel(ManagedChannel channel) { +channel.shutdown(); +try { + channel.awaitTermination(10, TimeUnit.SECONDS); +} catch (InterruptedException e) { + LOG.error("Couldn't close gRPC channel={}", channel, e); +} +channel.shutdownNow(); + } + + public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { +return channelCache.getUnchecked(windmillServiceAddress); + } + + public void removeAndClose(WindmillServiceAddress windmillServiceAddress) { +Optional.ofNullable(channelCache.getIfPresent(windmillServiceAddress)) +.ifPresent(ChannelCache::shutdownChannel); Review Comment: it would be nice if this didn't block the caller but happened in the background as it is just to help clean up. Otherwise if we get some metadata update that triggers removing X windmill worker endpoints we could delay up to 10*X seconds. It looks like Caffeine is recommended over guava cachebuilder from [javadoc]( https://guava.dev/releases/snapshot/api/docs/com/google/common/cache/CacheBuilder.html)
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-1979741093 back to you @scwhittle thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1513573432 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ## @@ -250,6 +257,22 @@ public GetWorkBudget remainingBudget() { .apply(currentInflightBudget); } + @Override + public synchronized void close() { +super.close(); +if (!rpcChannel.isShutdown()) { Review Comment: yea this is what was i was initially thinking but opted to use the cache and handle the closing of the channels in `StreamingEngineClient`. I think the channelCache encapsulates the logic well and makes it more testable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
scwhittle commented on code in PR #30425: URL: https://github.com/apache/beam/pull/30425#discussion_r1510885621 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ## @@ -250,6 +257,22 @@ public GetWorkBudget remainingBudget() { .apply(currentInflightBudget); } + @Override + public synchronized void close() { +super.close(); +if (!rpcChannel.isShutdown()) { Review Comment: it seems like this could shutdown a channel that is still in the cache? It seems like the cache should be responsible for shutting down channels that it is sure that are not being used anymore. If we need to track who is using the endpoint, do we need the cache? It seems like instead we could have the StreamingEngineClient cache the channels internally and shut them down when the worker endpoints change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
github-actions[bot] commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-1972055861 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
m-trieu commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-1972054383 R: @scwhittle This is for reuse of channels. This is more important during direct path since we will close streams (and their backing channels pointing to the backend workers) when we receive new metadata. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add a way for channels to be closed manually [beam]
github-actions[bot] commented on PR #30425: URL: https://github.com/apache/beam/pull/30425#issuecomment-1965332725 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org