Move fn-execution org.apache.beam.harness.channel

'harness' suggests the SDK harness package, which is not correct, as
this is used by both runners and the SDK harness.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbb3a730
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbb3a730
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbb3a730

Branch: refs/heads/master
Commit: cbb3a73086dccf16d6e2f521d99036aea17a0e1c
Parents: 5c74022
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 9 17:06:23 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 22 16:16:41 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/fnexecution/ServerFactory.java |  2 +-
 .../runners/fnexecution/ServerFactoryTest.java  |  2 +-
 .../harness/channel/ManagedChannelFactory.java  | 82 --------------------
 .../harness/channel/SocketAddressFactory.java   | 64 ---------------
 .../beam/harness/channel/package-info.java      | 22 ------
 .../sdk/fn/channel/ManagedChannelFactory.java   | 82 ++++++++++++++++++++
 .../sdk/fn/channel/SocketAddressFactory.java    | 64 +++++++++++++++
 .../beam/sdk/fn/channel/package-info.java       | 22 ++++++
 .../channel/ManagedChannelFactoryTest.java      | 71 -----------------
 .../channel/SocketAddressFactoryTest.java       | 56 -------------
 .../fn/channel/ManagedChannelFactoryTest.java   | 71 +++++++++++++++++
 .../fn/channel/SocketAddressFactoryTest.java    | 56 +++++++++++++
 .../org/apache/beam/fn/harness/FnHarness.java   |  2 +-
 13 files changed, 298 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 918672a..93c787d 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -28,7 +28,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import org.apache.beam.harness.channel.SocketAddressFactory;
+import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index b78e88a..e0d7bf9 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -38,7 +38,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.harness.test.Consumer;
 import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
deleted file mode 100644
index 187cfdb..0000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.channel;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.channel.epoll.EpollDomainSocketChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.unix.DomainSocketAddress;
-import java.net.SocketAddress;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-
-/**
- * A Factory which creates an underlying {@link ManagedChannel} implementation.
- */
-public abstract class ManagedChannelFactory {
-  public static ManagedChannelFactory createDefault() {
-    return new Default();
-  }
-
-  public static ManagedChannelFactory createEpoll() {
-    io.netty.channel.epoll.Epoll.ensureAvailability();
-    return new Epoll();
-  }
-
-  public abstract ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor);
-
-  /**
-   * Creates a {@link ManagedChannel} backed by an {@link 
EpollDomainSocketChannel} if the address
-   * is a {@link DomainSocketAddress}. Otherwise creates a {@link 
ManagedChannel} backed by an
-   * {@link EpollSocketChannel}.
-   */
-  private static class Epoll extends ManagedChannelFactory {
-    @Override
-    public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
-      SocketAddress address = 
SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
-      return NettyChannelBuilder.forAddress(address)
-          .channelType(address instanceof DomainSocketAddress
-              ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
-          .eventLoopGroup(new EpollEventLoopGroup())
-          .usePlaintext(true)
-          // Set the message size to max value here. The actual size is 
governed by the
-          // buffer size in the layers above.
-          .maxInboundMessageSize(Integer.MAX_VALUE)
-          .build();
-    }
-  }
-
-  /**
-   * Creates a {@link ManagedChannel} relying on the {@link 
ManagedChannelBuilder} to create
-   * instances.
-   */
-  private static class Default extends ManagedChannelFactory {
-    @Override
-    public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
-      return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
-          .usePlaintext(true)
-          // Set the message size to max value here. The actual size is 
governed by the
-          // buffer size in the layers above.
-          .maxInboundMessageSize(Integer.MAX_VALUE)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
deleted file mode 100644
index 5253291..0000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.channel;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.net.HostAndPort;
-import io.netty.channel.unix.DomainSocketAddress;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-/** Creates a {@link SocketAddress} based upon a supplied string. */
-public class SocketAddressFactory {
-  private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
-
-  /**
-   * Parse a {@link SocketAddress} from the given string.
-   */
-  public static SocketAddress createFrom(String value) {
-    if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
-      // Unix Domain Socket address.
-      // Create the underlying file for the Unix Domain Socket.
-      String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
-      File file = new File(filePath);
-      if (!file.isAbsolute()) {
-        throw new IllegalArgumentException("File path must be absolute: " + 
filePath);
-      }
-      try {
-        if (file.createNewFile()) {
-          // If this application created the file, delete it when the 
application exits.
-          file.deleteOnExit();
-        }
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-      // Create the SocketAddress referencing the file.
-      return new DomainSocketAddress(file);
-    } else {
-      // Standard TCP/IP address.
-      HostAndPort hostAndPort = HostAndPort.fromString(value);
-      checkArgument(hostAndPort.hasPort(),
-          "Address must be a unix:// path or be in the form host:port. Got: 
%s", value);
-      return new InetSocketAddress(hostAndPort.getHostText(), 
hostAndPort.getPort());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
deleted file mode 100644
index 2a33445..0000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * gRPC channel management.
- */
-package org.apache.beam.harness.channel;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
new file mode 100644
index 0000000..0a4a35d
--- /dev/null
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.channel;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.net.SocketAddress;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+
+/**
+ * A Factory which creates an underlying {@link ManagedChannel} implementation.
+ */
+public abstract class ManagedChannelFactory {
+  public static ManagedChannelFactory createDefault() {
+    return new Default();
+  }
+
+  public static ManagedChannelFactory createEpoll() {
+    io.netty.channel.epoll.Epoll.ensureAvailability();
+    return new Epoll();
+  }
+
+  public abstract ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor);
+
+  /**
+   * Creates a {@link ManagedChannel} backed by an {@link 
EpollDomainSocketChannel} if the address
+   * is a {@link DomainSocketAddress}. Otherwise creates a {@link 
ManagedChannel} backed by an
+   * {@link EpollSocketChannel}.
+   */
+  private static class Epoll extends ManagedChannelFactory {
+    @Override
+    public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
+      SocketAddress address = 
SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
+      return NettyChannelBuilder.forAddress(address)
+          .channelType(address instanceof DomainSocketAddress
+              ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
+          .eventLoopGroup(new EpollEventLoopGroup())
+          .usePlaintext(true)
+          // Set the message size to max value here. The actual size is 
governed by the
+          // buffer size in the layers above.
+          .maxInboundMessageSize(Integer.MAX_VALUE)
+          .build();
+    }
+  }
+
+  /**
+   * Creates a {@link ManagedChannel} relying on the {@link 
ManagedChannelBuilder} to create
+   * instances.
+   */
+  private static class Default extends ManagedChannelFactory {
+    @Override
+    public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
+      return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
+          .usePlaintext(true)
+          // Set the message size to max value here. The actual size is 
governed by the
+          // buffer size in the layers above.
+          .maxInboundMessageSize(Integer.MAX_VALUE)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java
new file mode 100644
index 0000000..090cc75
--- /dev/null
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.channel;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/** Creates a {@link SocketAddress} based upon a supplied string. */
+public class SocketAddressFactory {
+  private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
+
+  /**
+   * Parse a {@link SocketAddress} from the given string.
+   */
+  public static SocketAddress createFrom(String value) {
+    if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
+      // Unix Domain Socket address.
+      // Create the underlying file for the Unix Domain Socket.
+      String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
+      File file = new File(filePath);
+      if (!file.isAbsolute()) {
+        throw new IllegalArgumentException("File path must be absolute: " + 
filePath);
+      }
+      try {
+        if (file.createNewFile()) {
+          // If this application created the file, delete it when the 
application exits.
+          file.deleteOnExit();
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+      // Create the SocketAddress referencing the file.
+      return new DomainSocketAddress(file);
+    } else {
+      // Standard TCP/IP address.
+      HostAndPort hostAndPort = HostAndPort.fromString(value);
+      checkArgument(hostAndPort.hasPort(),
+          "Address must be a unix:// path or be in the form host:port. Got: 
%s", value);
+      return new InetSocketAddress(hostAndPort.getHostText(), 
hostAndPort.getPort());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java
new file mode 100644
index 0000000..3d2cb51
--- /dev/null
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC channel management.
+ */
+package org.apache.beam.sdk.fn.channel;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
deleted file mode 100644
index f73ed80..0000000
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.channel;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
-
-import io.grpc.ManagedChannel;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-
-/** Tests for {@link ManagedChannelFactory}. */
-@RunWith(JUnit4.class)
-public class ManagedChannelFactoryTest {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testDefaultChannel() {
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        
Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
-    ManagedChannel channel =
-        
ManagedChannelFactory.createDefault().forDescriptor(apiServiceDescriptor);
-    assertEquals("localhost:123", channel.authority());
-    channel.shutdownNow();
-  }
-
-  @Test
-  public void testEpollHostPortChannel() {
-    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        
Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
-    ManagedChannel channel =
-        
ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor);
-    assertEquals("localhost:123", channel.authority());
-    channel.shutdownNow();
-  }
-
-  @Test
-  public void testEpollDomainSocketChannel() throws Exception {
-    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        Endpoints.ApiServiceDescriptor.newBuilder()
-            .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath())
-            .build();
-    ManagedChannel channel =
-        
ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor);
-    assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), 
channel.authority());
-    channel.shutdownNow();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
deleted file mode 100644
index 95a7d67..0000000
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.channel;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import io.netty.channel.unix.DomainSocketAddress;
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link SocketAddressFactory}. */
-@RunWith(JUnit4.class)
-public class SocketAddressFactoryTest {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testHostPortSocket() {
-    SocketAddress socketAddress = 
SocketAddressFactory.createFrom("localhost:123");
-    assertThat(socketAddress, Matchers.instanceOf(InetSocketAddress.class));
-    assertEquals("localhost", ((InetSocketAddress) 
socketAddress).getHostString());
-    assertEquals(123, ((InetSocketAddress) socketAddress).getPort());
-  }
-
-  @Test
-  public void testDomainSocket() throws Exception {
-    File tmpFile = tmpFolder.newFile();
-    SocketAddress socketAddress = SocketAddressFactory.createFrom(
-        "unix://" + tmpFile.getAbsolutePath());
-    assertThat(socketAddress, Matchers.instanceOf(DomainSocketAddress.class));
-    assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) 
socketAddress).path());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java
new file mode 100644
index 0000000..fc08ff5
--- /dev/null
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactoryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.channel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import io.grpc.ManagedChannel;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+/** Tests for {@link ManagedChannelFactory}. */
+@RunWith(JUnit4.class)
+public class ManagedChannelFactoryTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testDefaultChannel() {
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        
Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
+    ManagedChannel channel =
+        
ManagedChannelFactory.createDefault().forDescriptor(apiServiceDescriptor);
+    assertEquals("localhost:123", channel.authority());
+    channel.shutdownNow();
+  }
+
+  @Test
+  public void testEpollHostPortChannel() {
+    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        
Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
+    ManagedChannel channel =
+        
ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor);
+    assertEquals("localhost:123", channel.authority());
+    channel.shutdownNow();
+  }
+
+  @Test
+  public void testEpollDomainSocketChannel() throws Exception {
+    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath())
+            .build();
+    ManagedChannel channel =
+        
ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor);
+    assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), 
channel.authority());
+    channel.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java
new file mode 100644
index 0000000..f702dd1
--- /dev/null
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/channel/SocketAddressFactoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.channel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SocketAddressFactory}. */
+@RunWith(JUnit4.class)
+public class SocketAddressFactoryTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testHostPortSocket() {
+    SocketAddress socketAddress = 
SocketAddressFactory.createFrom("localhost:123");
+    assertThat(socketAddress, Matchers.instanceOf(InetSocketAddress.class));
+    assertEquals("localhost", ((InetSocketAddress) 
socketAddress).getHostString());
+    assertEquals(123, ((InetSocketAddress) socketAddress).getPort());
+  }
+
+  @Test
+  public void testDomainSocket() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    SocketAddress socketAddress = SocketAddressFactory.createFrom(
+        "unix://" + tmpFile.getAbsolutePath());
+    assertThat(socketAddress, Matchers.instanceOf(DomainSocketAddress.class));
+    assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) 
socketAddress).path());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb3a730/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index e1790fa..b644266 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -30,7 +30,7 @@ import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.harness.stream.StreamObserverFactory;
-import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder;

Reply via email to