Move fn-execution org.apache.beam.harness.channel 'harness' suggests the SDK harness package, which is not correct, as this is used by both runners and the SDK harness.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbb3a730 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbb3a730 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbb3a730 Branch: refs/heads/master Commit: cbb3a73086dccf16d6e2f521d99036aea17a0e1c Parents: 5c74022 Author: Thomas Groh <tg...@google.com> Authored: Thu Nov 9 17:06:23 2017 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Wed Nov 22 16:16:41 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/fnexecution/ServerFactory.java | 2 +- .../runners/fnexecution/ServerFactoryTest.java | 2 +- .../harness/channel/ManagedChannelFactory.java | 82 -------------------- .../harness/channel/SocketAddressFactory.java | 64 --------------- .../beam/harness/channel/package-info.java | 22 ------ .../sdk/fn/channel/ManagedChannelFactory.java | 82 ++++++++++++++++++++ .../sdk/fn/channel/SocketAddressFactory.java | 64 +++++++++++++++ .../beam/sdk/fn/channel/package-info.java | 22 ++++++ .../channel/ManagedChannelFactoryTest.java | 71 ----------------- .../channel/SocketAddressFactoryTest.java | 56 ------------- .../fn/channel/ManagedChannelFactoryTest.java | 71 +++++++++++++++++ .../fn/channel/SocketAddressFactoryTest.java | 56 +++++++++++++ .../org/apache/beam/fn/harness/FnHarness.java | 2 +- 13 files changed, 298 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java index 918672a..93c787d 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java @@ -28,7 +28,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import org.apache.beam.harness.channel.SocketAddressFactory; +import org.apache.beam.sdk.fn.channel.SocketAddressFactory; import org.apache.beam.model.pipeline.v1.Endpoints; /** http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java index b78e88a..e0d7bf9 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java @@ -38,7 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.beam.harness.channel.ManagedChannelFactory; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java deleted file mode 100644 index 187cfdb..0000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.channel; - -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.netty.NettyChannelBuilder; -import io.netty.channel.epoll.EpollDomainSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.unix.DomainSocketAddress; -import java.net.SocketAddress; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; - -/** - * A Factory which creates an underlying {@link ManagedChannel} implementation. - */ -public abstract class ManagedChannelFactory { - public static ManagedChannelFactory createDefault() { - return new Default(); - } - - public static ManagedChannelFactory createEpoll() { - io.netty.channel.epoll.Epoll.ensureAvailability(); - return new Epoll(); - } - - public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); - - /** - * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address - * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an - * {@link EpollSocketChannel}. - */ - private static class Epoll extends ManagedChannelFactory { - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { - SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl()); - return NettyChannelBuilder.forAddress(address) - .channelType(address instanceof DomainSocketAddress - ? EpollDomainSocketChannel.class : EpollSocketChannel.class) - .eventLoopGroup(new EpollEventLoopGroup()) - .usePlaintext(true) - // Set the message size to max value here. The actual size is governed by the - // buffer size in the layers above. - .maxInboundMessageSize(Integer.MAX_VALUE) - .build(); - } - } - - /** - * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create - * instances. - */ - private static class Default extends ManagedChannelFactory { - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { - return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) - .usePlaintext(true) - // Set the message size to max value here. The actual size is governed by the - // buffer size in the layers above. - .maxInboundMessageSize(Integer.MAX_VALUE) - .build(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java deleted file mode 100644 index 5253291..0000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.channel; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.net.HostAndPort; -import io.netty.channel.unix.DomainSocketAddress; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -/** Creates a {@link SocketAddress} based upon a supplied string. */ -public class SocketAddressFactory { - private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://"; - - /** - * Parse a {@link SocketAddress} from the given string. - */ - public static SocketAddress createFrom(String value) { - if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) { - // Unix Domain Socket address. - // Create the underlying file for the Unix Domain Socket. - String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length()); - File file = new File(filePath); - if (!file.isAbsolute()) { - throw new IllegalArgumentException("File path must be absolute: " + filePath); - } - try { - if (file.createNewFile()) { - // If this application created the file, delete it when the application exits. - file.deleteOnExit(); - } - } catch (IOException ex) { - throw new RuntimeException(ex); - } - // Create the SocketAddress referencing the file. - return new DomainSocketAddress(file); - } else { - // Standard TCP/IP address. - HostAndPort hostAndPort = HostAndPort.fromString(value); - checkArgument(hostAndPort.hasPort(), - "Address must be a unix:// path or be in the form host:port. Got: %s", value); - return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java deleted file mode 100644 index 2a33445..0000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * gRPC channel management. - */ -package org.apache.beam.harness.channel; http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java new file mode 100644 index 0000000..0a4a35d --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.channel; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; +import java.net.SocketAddress; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; + +/** + * A Factory which creates an underlying {@link ManagedChannel} implementation. + */ +public abstract class ManagedChannelFactory { + public static ManagedChannelFactory createDefault() { + return new Default(); + } + + public static ManagedChannelFactory createEpoll() { + io.netty.channel.epoll.Epoll.ensureAvailability(); + return new Epoll(); + } + + public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); + + /** + * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address + * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an + * {@link EpollSocketChannel}. + */ + private static class Epoll extends ManagedChannelFactory { + @Override + public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl()); + return NettyChannelBuilder.forAddress(address) + .channelType(address instanceof DomainSocketAddress + ? EpollDomainSocketChannel.class : EpollSocketChannel.class) + .eventLoopGroup(new EpollEventLoopGroup()) + .usePlaintext(true) + // Set the message size to max value here. The actual size is governed by the + // buffer size in the layers above. + .maxInboundMessageSize(Integer.MAX_VALUE) + .build(); + } + } + + /** + * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create + * instances. + */ + private static class Default extends ManagedChannelFactory { + @Override + public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) + .usePlaintext(true) + // Set the message size to max value here. The actual size is governed by the + // buffer size in the layers above. + .maxInboundMessageSize(Integer.MAX_VALUE) + .build(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java new file mode 100644 index 0000000..090cc75 --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.channel; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.net.HostAndPort; +import io.netty.channel.unix.DomainSocketAddress; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** Creates a {@link SocketAddress} based upon a supplied string. */ +public class SocketAddressFactory { + private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://"; + + /** + * Parse a {@link SocketAddress} from the given string. + */ + public static SocketAddress createFrom(String value) { + if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) { + // Unix Domain Socket address. + // Create the underlying file for the Unix Domain Socket. + String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length()); + File file = new File(filePath); + if (!file.isAbsolute()) { + throw new IllegalArgumentException("File path must be absolute: " + filePath); + } + try { + if (file.createNewFile()) { + // If this application created the file, delete it when the application exits. + file.deleteOnExit(); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + // Create the SocketAddress referencing the file. + return new DomainSocketAddress(file); + } else { + // Standard TCP/IP address. + HostAndPort hostAndPort = HostAndPort.fromString(value); + checkArgument(hostAndPort.hasPort(), + "Address must be a unix:// path or be in the form host:port. Got: %s", value); + return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java new file mode 100644 index 0000000..3d2cb51 --- /dev/null +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * gRPC channel management. + */ +package org.apache.beam.sdk.fn.channel; http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java deleted file mode 100644 index f73ed80..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.channel; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeTrue; - -import io.grpc.ManagedChannel; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - - -/** Tests for {@link ManagedChannelFactory}. */ -@RunWith(JUnit4.class) -public class ManagedChannelFactoryTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testDefaultChannel() { - Endpoints.ApiServiceDescriptor apiServiceDescriptor = - Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build(); - ManagedChannel channel = - ManagedChannelFactory.createDefault().forDescriptor(apiServiceDescriptor); - assertEquals("localhost:123", channel.authority()); - channel.shutdownNow(); - } - - @Test - public void testEpollHostPortChannel() { - assumeTrue(io.netty.channel.epoll.Epoll.isAvailable()); - Endpoints.ApiServiceDescriptor apiServiceDescriptor = - Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build(); - ManagedChannel channel = - ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor); - assertEquals("localhost:123", channel.authority()); - channel.shutdownNow(); - } - - @Test - public void testEpollDomainSocketChannel() throws Exception { - assumeTrue(io.netty.channel.epoll.Epoll.isAvailable()); - Endpoints.ApiServiceDescriptor apiServiceDescriptor = - Endpoints.ApiServiceDescriptor.newBuilder() - .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath()) - .build(); - ManagedChannel channel = - ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor); - assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), channel.authority()); - channel.shutdownNow(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java deleted file mode 100644 index 95a7d67..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.harness.channel; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import io.netty.channel.unix.DomainSocketAddress; -import java.io.File; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link SocketAddressFactory}. */ -@RunWith(JUnit4.class) -public class SocketAddressFactoryTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testHostPortSocket() { - SocketAddress socketAddress = SocketAddressFactory.createFrom("localhost:123"); - assertThat(socketAddress, Matchers.instanceOf(InetSocketAddress.class)); - assertEquals("localhost", ((InetSocketAddress) socketAddress).getHostString()); - assertEquals(123, ((InetSocketAddress) socketAddress).getPort()); - } - - @Test - public void testDomainSocket() throws Exception { - File tmpFile = tmpFolder.newFile(); - SocketAddress socketAddress = SocketAddressFactory.createFrom( - "unix://" + tmpFile.getAbsolutePath()); - assertThat(socketAddress, Matchers.instanceOf(DomainSocketAddress.class)); - assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) socketAddress).path()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java new file mode 100644 index 0000000..fc08ff5 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.channel; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import io.grpc.ManagedChannel; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + +/** Tests for {@link ManagedChannelFactory}. */ +@RunWith(JUnit4.class) +public class ManagedChannelFactoryTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testDefaultChannel() { + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build(); + ManagedChannel channel = + ManagedChannelFactory.createDefault().forDescriptor(apiServiceDescriptor); + assertEquals("localhost:123", channel.authority()); + channel.shutdownNow(); + } + + @Test + public void testEpollHostPortChannel() { + assumeTrue(io.netty.channel.epoll.Epoll.isAvailable()); + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build(); + ManagedChannel channel = + ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor); + assertEquals("localhost:123", channel.authority()); + channel.shutdownNow(); + } + + @Test + public void testEpollDomainSocketChannel() throws Exception { + assumeTrue(io.netty.channel.epoll.Epoll.isAvailable()); + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() + .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath()) + .build(); + ManagedChannel channel = + ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor); + assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), channel.authority()); + channel.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java new file mode 100644 index 0000000..f702dd1 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.fn.channel; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import io.netty.channel.unix.DomainSocketAddress; +import java.io.File; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link SocketAddressFactory}. */ +@RunWith(JUnit4.class) +public class SocketAddressFactoryTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testHostPortSocket() { + SocketAddress socketAddress = SocketAddressFactory.createFrom("localhost:123"); + assertThat(socketAddress, Matchers.instanceOf(InetSocketAddress.class)); + assertEquals("localhost", ((InetSocketAddress) socketAddress).getHostString()); + assertEquals(123, ((InetSocketAddress) socketAddress).getPort()); + } + + @Test + public void testDomainSocket() throws Exception { + File tmpFile = tmpFolder.newFile(); + SocketAddress socketAddress = SocketAddressFactory.createFrom( + "unix://" + tmpFile.getAbsolutePath()); + assertThat(socketAddress, Matchers.instanceOf(DomainSocketAddress.class)); + assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) socketAddress).path()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index e1790fa..b644266 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -30,7 +30,7 @@ import org.apache.beam.fn.harness.fn.ThrowingFunction; import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; import org.apache.beam.fn.harness.stream.StreamObserverFactory; -import org.apache.beam.harness.channel.ManagedChannelFactory; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder;