This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c50e3fc  [BEAM-6488] Portable Flink runner support for running 
cross-language transforms (#7709)
c50e3fc is described below

commit c50e3fc96020125c1afd3afb09bca500ee110987
Author: Heejong Lee <heej...@gmail.com>
AuthorDate: Fri Feb 8 20:31:44 2019 -0800

    [BEAM-6488] Portable Flink runner support for running cross-language 
transforms (#7709)
    
    Multi-language support in DefaultJobBundleFactory
---
 .../control/DefaultJobBundleFactory.java           | 199 ++++++++++++---------
 .../control/DefaultJobBundleFactoryTest.java       |  28 ++-
 2 files changed, 133 insertions(+), 94 deletions(-)

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 0764aa0..5881f4c 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.runners.fnexecution.control;
 
+import com.google.auto.value.AutoValue;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -66,25 +66,17 @@ import org.slf4j.LoggerFactory;
  * EnvironmentFactory} for environment management. Note that returned {@link 
StageBundleFactory
  * stage bundle factories} are not thread-safe. Instead, a new stage factory 
should be created for
  * each client. {@link DefaultJobBundleFactory} initializes the Environment 
lazily when the forStage
- * is called for a stage. This factory is not capable of handling mixed types 
of environment.
+ * is called for a stage.
  */
 @ThreadSafe
 public class DefaultJobBundleFactory implements JobBundleFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJobBundleFactory.class);
 
-  private final IdGenerator stageIdGenerator;
   private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
-  // Using environment as the initialization marker.
-  private Environment environment;
-  private ExecutorService executor;
-  private GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  private GrpcFnServer<GrpcLoggingService> loggingServer;
-  private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  private GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-  private GrpcFnServer<GrpcDataService> dataServer;
-  private GrpcFnServer<GrpcStateService> stateServer;
-  private MapControlClientPool clientPool;
-  private EnvironmentFactory environmentFactory;
+  private final Map<String, EnvironmentFactory.Provider> 
environmentFactoryProviderMap;
+  private final ExecutorService executor;
+  private final MapControlClientPool clientPool;
+  private final IdGenerator stageIdGenerator;
 
   public static DefaultJobBundleFactory create(
       JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryProviderMap) {
@@ -94,42 +86,42 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
   DefaultJobBundleFactory(
       JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryMap) {
     IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
+    this.environmentFactoryProviderMap = environmentFactoryMap;
+    this.executor = Executors.newCachedThreadPool();
+    this.clientPool = MapControlClientPool.create();
     this.stageIdGenerator = stageIdGenerator;
     this.environmentCache =
-        createEnvironmentCache(
-            environment -> {
-              synchronized (this) {
-                checkAndInitialize(jobInfo, environmentFactoryMap, 
environment);
-              }
-              return environmentFactory.createEnvironment(environment);
-            });
+        createEnvironmentCache(serverFactory -> createServerInfo(jobInfo, 
serverFactory));
   }
 
   @VisibleForTesting
   DefaultJobBundleFactory(
-      EnvironmentFactory environmentFactory,
+      Map<String, EnvironmentFactory.Provider> environmentFactoryMap,
       IdGenerator stageIdGenerator,
       GrpcFnServer<FnApiControlClientPoolService> controlServer,
       GrpcFnServer<GrpcLoggingService> loggingServer,
       GrpcFnServer<ArtifactRetrievalService> retrievalServer,
       GrpcFnServer<StaticGrpcProvisionService> provisioningServer,
       GrpcFnServer<GrpcDataService> dataServer,
-      GrpcFnServer<GrpcStateService> stateServer)
-      throws Exception {
+      GrpcFnServer<GrpcStateService> stateServer) {
+    this.environmentFactoryProviderMap = environmentFactoryMap;
     this.executor = Executors.newCachedThreadPool();
+    this.clientPool = MapControlClientPool.create();
     this.stageIdGenerator = stageIdGenerator;
-    this.controlServer = controlServer;
-    this.loggingServer = loggingServer;
-    this.retrievalServer = retrievalServer;
-    this.provisioningServer = provisioningServer;
-    this.dataServer = dataServer;
-    this.stateServer = stateServer;
-    this.environmentCache =
-        createEnvironmentCache(env -> 
environmentFactory.createEnvironment(env));
+    ServerInfo serverInfo =
+        new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder()
+            .setControlServer(controlServer)
+            .setLoggingServer(loggingServer)
+            .setRetrievalServer(retrievalServer)
+            .setProvisioningServer(provisioningServer)
+            .setDataServer(dataServer)
+            .setStateServer(stateServer)
+            .build();
+    this.environmentCache = createEnvironmentCache(serverFactory -> 
serverInfo);
   }
 
   private LoadingCache<Environment, WrappedSdkHarnessClient> 
createEnvironmentCache(
-      ThrowingFunction<Environment, RemoteEnvironment> environmentCreator) {
+      ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator) {
     return CacheBuilder.newBuilder()
         .removalListener(
             (RemovalNotification<Environment, WrappedSdkHarnessClient> 
notification) -> {
@@ -145,8 +137,21 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
             new CacheLoader<Environment, WrappedSdkHarnessClient>() {
               @Override
               public WrappedSdkHarnessClient load(Environment environment) 
throws Exception {
+                EnvironmentFactory.Provider environmentFactoryProvider =
+                    environmentFactoryProviderMap.get(environment.getUrn());
+                ServerFactory serverFactory = 
environmentFactoryProvider.getServerFactory();
+                ServerInfo serverInfo = serverInfoCreator.apply(serverFactory);
+
+                EnvironmentFactory environmentFactory =
+                    environmentFactoryProvider.createEnvironmentFactory(
+                        serverInfo.getControlServer(),
+                        serverInfo.getLoggingServer(),
+                        serverInfo.getRetrievalServer(),
+                        serverInfo.getProvisioningServer(),
+                        clientPool,
+                        stageIdGenerator);
                 return WrappedSdkHarnessClient.wrapping(
-                    environmentCreator.apply(environment), dataServer);
+                    environmentFactory.createEnvironment(environment), 
serverInfo);
               }
             });
   }
@@ -161,12 +166,13 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
           ProcessBundleDescriptors.fromExecutableStage(
               stageIdGenerator.getId(),
               executableStage,
-              dataServer.getApiServiceDescriptor(),
-              stateServer.getApiServiceDescriptor());
+              
wrappedClient.getServerInfo().getDataServer().getApiServiceDescriptor(),
+              
wrappedClient.getServerInfo().getStateServer().getApiServiceDescriptor());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    return SimpleStageBundleFactory.create(wrappedClient, 
processBundleDescriptor, stateServer);
+    return SimpleStageBundleFactory.create(
+        wrappedClient, processBundleDescriptor, 
wrappedClient.getServerInfo().getStateServer());
   }
 
   @Override
@@ -176,14 +182,6 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
     environmentCache.invalidateAll();
     environmentCache.cleanUp();
 
-    // Tear down common servers.
-    stateServer.close();
-    dataServer.close();
-    controlServer.close();
-    loggingServer.close();
-    retrievalServer.close();
-    provisioningServer.close();
-
     executor.shutdown();
   }
 
@@ -269,85 +267,114 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
 
     private final RemoteEnvironment environment;
     private final SdkHarnessClient client;
+    private final ServerInfo serverInfo;
 
-    static WrappedSdkHarnessClient wrapping(
-        RemoteEnvironment environment, GrpcFnServer<GrpcDataService> 
dataServer) {
+    static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, 
ServerInfo serverInfo) {
       SdkHarnessClient client =
           SdkHarnessClient.usingFnApiClient(
-              environment.getInstructionRequestHandler(), 
dataServer.getService());
-      return new WrappedSdkHarnessClient(environment, client);
+              environment.getInstructionRequestHandler(), 
serverInfo.getDataServer().getService());
+      return new WrappedSdkHarnessClient(environment, client, serverInfo);
     }
 
-    private WrappedSdkHarnessClient(RemoteEnvironment environment, 
SdkHarnessClient client) {
+    private WrappedSdkHarnessClient(
+        RemoteEnvironment environment, SdkHarnessClient client, ServerInfo 
serverInfo) {
       this.environment = environment;
       this.client = client;
+      this.serverInfo = serverInfo;
     }
 
     SdkHarnessClient getClient() {
       return client;
     }
 
+    ServerInfo getServerInfo() {
+      return serverInfo;
+    }
+
     @Override
     public void close() throws Exception {
       try (AutoCloseable envCloser = environment) {
         // Wrap resources in try-with-resources to ensure all are cleaned up.
       }
+      try (AutoCloseable stateServer = serverInfo.getStateServer();
+          AutoCloseable dateServer = serverInfo.getDataServer();
+          AutoCloseable controlServer = serverInfo.getControlServer();
+          AutoCloseable loggingServer = serverInfo.getLoggingServer();
+          AutoCloseable retrievalServer = serverInfo.getRetrievalServer();
+          AutoCloseable provisioningServer = 
serverInfo.getProvisioningServer()) {}
       // TODO: Wait for executor shutdown?
     }
   }
 
-  @GuardedBy("this")
-  private void checkAndInitialize(
-      JobInfo jobInfo,
-      Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap,
-      Environment environment)
+  private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory 
serverFactory)
       throws IOException {
-    Preconditions.checkNotNull(environment, "Environment can not be null");
-    if (this.environment != null) {
-      Preconditions.checkArgument(
-          this.environment.getUrn().equals(environment.getUrn()),
-          "Unsupported: Mixing environment types (%s, %s) is not supported for 
a job.",
-          this.environment.getUrn(),
-          environment.getUrn());
-      // Nothing to do. Already initialized.
-      return;
-    }
-
-    EnvironmentFactory.Provider environmentFactoryProvider =
-        environmentFactoryProviderMap.get(environment.getUrn());
-    ServerFactory serverFactory = 
environmentFactoryProvider.getServerFactory();
+    Preconditions.checkNotNull(serverFactory, "serverFactory can not be null");
 
-    this.clientPool = MapControlClientPool.create();
-    this.executor = Executors.newCachedThreadPool();
-    this.controlServer =
+    GrpcFnServer<FnApiControlClientPoolService> controlServer =
         GrpcFnServer.allocatePortAndCreateFor(
             FnApiControlClientPoolService.offeringClientsToPool(
                 clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
             serverFactory);
-    this.loggingServer =
+    GrpcFnServer<GrpcLoggingService> loggingServer =
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
-    this.retrievalServer =
+    GrpcFnServer<ArtifactRetrievalService> retrievalServer =
         GrpcFnServer.allocatePortAndCreateFor(
             BeamFileSystemArtifactRetrievalService.create(), serverFactory);
-    this.provisioningServer =
+    GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
         GrpcFnServer.allocatePortAndCreateFor(
             StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), 
serverFactory);
-    this.dataServer =
+    GrpcFnServer<GrpcDataService> dataServer =
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcDataService.create(executor, 
OutboundObserverFactory.serverDirect()),
             serverFactory);
-    this.stateServer =
+    GrpcFnServer<GrpcStateService> stateServer =
         GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory);
 
-    this.environmentFactory =
-        environmentFactoryProvider.createEnvironmentFactory(
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer,
-            clientPool,
-            stageIdGenerator);
-    this.environment = environment;
+    ServerInfo serverInfo =
+        new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder()
+            .setControlServer(controlServer)
+            .setLoggingServer(loggingServer)
+            .setRetrievalServer(retrievalServer)
+            .setProvisioningServer(provisioningServer)
+            .setDataServer(dataServer)
+            .setStateServer(stateServer)
+            .build();
+    return serverInfo;
+  }
+
+  /** A container for EnvironmentFactory and its corresponding Grpc servers. */
+  @AutoValue
+  public abstract static class ServerInfo {
+    abstract GrpcFnServer<FnApiControlClientPoolService> getControlServer();
+
+    abstract GrpcFnServer<GrpcLoggingService> getLoggingServer();
+
+    abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer();
+
+    abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer();
+
+    abstract GrpcFnServer<GrpcDataService> getDataServer();
+
+    abstract GrpcFnServer<GrpcStateService> getStateServer();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder 
setControlServer(GrpcFnServer<FnApiControlClientPoolService> server);
+
+      abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> 
server);
+
+      abstract Builder 
setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> server);
+
+      abstract Builder 
setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> server);
+
+      abstract Builder setDataServer(GrpcFnServer<GrpcDataService> server);
+
+      abstract Builder setStateServer(GrpcFnServer<GrpcStateService> server);
+
+      abstract ServerInfo build();
+    }
   }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
index 5c90c31..20c7578 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,6 +81,15 @@ public class DefaultJobBundleFactoryTest {
   private final IdGenerator stageIdGenerator = 
IdGenerators.incrementingLongs();
   private final InstructionResponse instructionResponse =
       
InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
+  private final EnvironmentFactory.Provider envFactoryProvider =
+      (GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+          GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+          GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+          GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+          ControlClientPool clientPool,
+          IdGenerator idGenerator) -> envFactory;
+  private final Map<String, EnvironmentFactory.Provider> envFactoryProviderMap 
=
+      ImmutableMap.of(environment.getUrn(), envFactoryProvider);
 
   @Before
   public void setUpMocks() throws Exception {
@@ -100,7 +108,7 @@ public class DefaultJobBundleFactoryTest {
   public void createsCorrectEnvironment() throws Exception {
     try (DefaultJobBundleFactory bundleFactory =
         new DefaultJobBundleFactory(
-            envFactory,
+            envFactoryProviderMap,
             stageIdGenerator,
             controlServer,
             loggingServer,
@@ -164,7 +172,7 @@ public class DefaultJobBundleFactoryTest {
       verify(envFactoryA, Mockito.times(0)).createEnvironment(environmentAA);
 
       bundleFactory.forStage(getExecutableStage(environmentAA));
-      verify(environmentProviderFactoryA, Mockito.times(1))
+      verify(environmentProviderFactoryA, Mockito.times(2))
           .createEnvironmentFactory(any(), any(), any(), any(), any(), any());
       verify(environmentProviderFactoryB, Mockito.times(0))
           .createEnvironmentFactory(any(), any(), any(), any(), any(), any());
@@ -174,7 +182,7 @@ public class DefaultJobBundleFactoryTest {
   }
 
   @Test
-  public void failedCreatingMultipleEnvironmentFromMultipleTypes() throws 
Exception {
+  public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception {
     ServerFactory serverFactory = ServerFactory.createDefault();
 
     Environment environmentA = 
Environment.newBuilder().setUrn("env:urn:a").build();
@@ -206,16 +214,17 @@ public class DefaultJobBundleFactoryTest {
             JobInfo.create("testJob", "testJob", "token", 
Struct.getDefaultInstance()),
             environmentFactoryProviderMap)) {
       bundleFactory.forStage(getExecutableStage(environmentB));
-      thrown.expectCause(Matchers.any(IllegalArgumentException.class));
       bundleFactory.forStage(getExecutableStage(environmentA));
     }
+    verify(envFactoryA).createEnvironment(environmentA);
+    verify(envFactoryB).createEnvironment(environmentB);
   }
 
   @Test
   public void closesEnvironmentOnCleanup() throws Exception {
     DefaultJobBundleFactory bundleFactory =
         new DefaultJobBundleFactory(
-            envFactory,
+            envFactoryProviderMap,
             stageIdGenerator,
             controlServer,
             loggingServer,
@@ -233,7 +242,7 @@ public class DefaultJobBundleFactoryTest {
   public void cachesEnvironment() throws Exception {
     try (DefaultJobBundleFactory bundleFactory =
         new DefaultJobBundleFactory(
-            envFactory,
+            envFactoryProviderMap,
             stageIdGenerator,
             controlServer,
             loggingServer,
@@ -258,6 +267,9 @@ public class DefaultJobBundleFactoryTest {
     Environment envFoo = 
Environment.newBuilder().setUrn("dummy:urn:another").build();
     RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
     InstructionRequestHandler fooInstructionHandler = 
mock(InstructionRequestHandler.class);
+    Map<String, EnvironmentFactory.Provider> envFactoryProviderMapFoo =
+        ImmutableMap.of(
+            environment.getUrn(), envFactoryProvider, envFoo.getUrn(), 
envFactoryProvider);
     when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
     
when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler);
     // Don't bother creating a distinct instruction response because we don't 
use it here.
@@ -266,7 +278,7 @@ public class DefaultJobBundleFactoryTest {
 
     try (DefaultJobBundleFactory bundleFactory =
         new DefaultJobBundleFactory(
-            envFactory,
+            envFactoryProviderMapFoo,
             stageIdGenerator,
             controlServer,
             loggingServer,

Reply via email to