Repository: reef Updated Branches: refs/heads/REEF-335 fed6fb770 -> ea249f7f4
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java new file mode 100644 index 0000000..e3f74c8 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.service; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.driver.parameters.DriverIdleSources; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredImpl; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * Binds all driver bridge service handlers to the driver. + */ +@Private +public final class DriverServiceConfiguration extends ConfigurationModuleBuilder { + + public static final RequiredImpl<IDriverService> DRIVER_SERVICE_IMPL = new RequiredImpl<>(); + + public static final RequiredParameter<String> DRIVER_CLIENT_COMMAND = new RequiredParameter<>(); + + /** Configuration module that binds all driver handlers. */ + public static final ConfigurationModule CONF = new DriverServiceConfiguration() + .merge(DriverConfiguration.CONF) + .bindImplementation(IDriverService.class, DRIVER_SERVICE_IMPL) + .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND) + .bindSetEntry(DriverIdleSources.class, IDriverService.class) + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java new file mode 100644 index 0000000..cca2436 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.service; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.*; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Contains Java side event handlers that perform + * hand-off with the driver client side. + */ +@Unit +public final class DriverServiceHandlers { + + private static final Logger LOG = Logger.getLogger(DriverServiceHandlers.class.getName()); + + private final IDriverService driverBridgeService; + + @Inject + private DriverServiceHandlers( + final IDriverService driverBridgeService) { + this.driverBridgeService = driverBridgeService; + } + + /** + * Job Driver is ready and the clock is set up: request the evaluators. + */ + final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + LOG.log(Level.INFO, "JavaBridge: Start Driver"); + DriverServiceHandlers.this.driverBridgeService.startHandler(startTime); + } + } + + /** + * Job Driver is is shutting down: write to the log. + */ + final class StopHandler implements EventHandler<StopTime> { + @Override + public void onNext(final StopTime stopTime) { + LOG.log(Level.INFO, "JavaBridge: Stop Driver"); + DriverServiceHandlers.this.driverBridgeService.stopHandler(stopTime); + } + } + + /** + * Receive notification that an Evaluator had been allocated, + * and submitTask a new Task in that Evaluator. + */ + final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator eval) { + LOG.log(Level.INFO, "JavaBridge: Allocated Evaluator {0}", eval.getId()); + DriverServiceHandlers.this.driverBridgeService.allocatedEvaluatorHandler(eval); + } + } + + final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { + @Override + public void onNext(final CompletedEvaluator eval) { + LOG.log(Level.INFO, "JavaBridge: Completed Evaluator {0}", eval.getId()); + DriverServiceHandlers.this.driverBridgeService.completedEvaluatorHandler(eval); + } + } + + final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + @Override + public void onNext(final FailedEvaluator eval) { + LOG.log(Level.INFO, "JavaBridge: Failed Evaluator {0}", eval.getId()); + DriverServiceHandlers.this.driverBridgeService.failedEvaluatorHandler(eval); + } + } + + /** + * Receive notification that the Context is active. + */ + final class ActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext context) { + LOG.log(Level.INFO, "JavaBridge: Active Context {0}", context.getId()); + DriverServiceHandlers.this.driverBridgeService.activeContextHandler(context); + } + } + + /** + * Received notification that the Context is closed. + */ + final class ClosedContextHandler implements EventHandler<ClosedContext> { + @Override + public void onNext(final ClosedContext context) { + LOG.log(Level.INFO, "JavaBridge: Closed Context {0}", context.getId()); + DriverServiceHandlers.this.driverBridgeService.closedContextHandler(context); + } + } + + /** + * Received a message from the context. + */ + final class ContextMessageHandler implements EventHandler<ContextMessage> { + @Override + public void onNext(final ContextMessage message) { + LOG.log(Level.INFO, "JavaBridge: Context Message id {0}", message.getId()); + DriverServiceHandlers.this.driverBridgeService.contextMessageHandler(message); + } + } + + /** + * Received notification that the Context failed. + */ + final class ContextFailedHandler implements EventHandler<FailedContext> { + @Override + public void onNext(final FailedContext context) { + LOG.log(Level.INFO, "JavaBridge: Context Failed {0}", context.getId()); + DriverServiceHandlers.this.driverBridgeService.failedContextHandler(context); + } + } + + /** + * Receive notification that the Task is running. + */ + final class RunningTaskHandler implements EventHandler<RunningTask> { + @Override + public void onNext(final RunningTask task) { + LOG.log(Level.INFO, "JavaBridge: Running Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.runningTaskHandler(task); + } + } + + /** + * Received notification that the Task failed. + */ + final class FailedTaskHandler implements EventHandler<FailedTask> { + @Override + public void onNext(final FailedTask task) { + LOG.log(Level.INFO, "JavaBridge: Failed Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.failedTaskHandler(task); + } + } + + /** + * Receive notification that the Task has completed successfully. + */ + final class CompletedTaskHandler implements EventHandler<CompletedTask> { + @Override + public void onNext(final CompletedTask task) { + LOG.log(Level.INFO, "JavaBridge: Completed Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.completedTaskHandler(task); + } + } + + /** + * Received notification that the Task was suspended. + */ + final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { + @Override + public void onNext(final SuspendedTask task) { + LOG.log(Level.INFO, "JavaBridge: Suspended Task {0}", task.getId()); + DriverServiceHandlers.this.driverBridgeService.suspendedTaskHandler(task); + } + } + + /** + * Received a message from the task. + */ + final class TaskMessageHandler implements EventHandler<TaskMessage> { + @Override + public void onNext(final TaskMessage message) { + LOG.log(Level.INFO, "JavaBridge: Message from Task {0}", message.getId()); + DriverServiceHandlers.this.driverBridgeService.taskMessageHandler(message); + } + } + + /** + * Received a message from the client. + */ + final class ClientMessageHandler implements EventHandler<byte[]> { + @Override + public void onNext(final byte[] message) { + LOG.log(Level.INFO, "JavaBridge: Message from Client"); + DriverServiceHandlers.this.driverBridgeService.clientMessageHandler(message); + } + } + + /** + * Received a close event from the client. + */ + final class ClientCloseHandler implements EventHandler<Void> { + @Override + public void onNext(final Void value) { + LOG.log(Level.INFO, "JavaBridge: Close event from Client"); + DriverServiceHandlers.this.driverBridgeService.clientCloseHandler(); + } + } + + /** + * Received a close event with message. + */ + final class ClientCloseWithMessageHandler implements EventHandler<byte[]> { + @Override + public void onNext(final byte[] message) { + LOG.log(Level.INFO, "JavaBridge: Close event with messages from Client"); + DriverServiceHandlers.this.driverBridgeService.clientCloseWithMessageHandler(message); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java new file mode 100644 index 0000000..fbafff9 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.service; + +import com.google.protobuf.util.JsonFormat; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.client.JavaDriverClientLauncher; +import org.apache.reef.bridge.examples.WindowsRuntimePathProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.bridge.service.grpc.GRPCDriverService; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; +import org.apache.reef.runtime.common.files.ClasspathProvider; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.files.RuntimePathProvider; +import org.apache.reef.runtime.common.files.UnixJVMPathProvider; +import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.tang.*; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.util.OSUtils; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver Service Launcher - main class. + */ +public final class DriverServiceLauncher { + + /** + * Standard Java logger. + */ + private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName()); + + /** + * This class should not be instantiated. + */ + private DriverServiceLauncher() { + throw new RuntimeException("Do not instantiate this class!"); + } + + /** + * Parse command line arguments and create TANG configuration ready to be submitted to REEF. + * + * @param driverClientConfigurationProto containing which runtime to configure: local, yarn, azbatch + * @return (immutable) TANG Configuration object. + * @throws BindException if configuration commandLineInjector fails. + * @throws InjectionException if configuration commandLineInjector fails. + */ + private static Configuration getRuntimeConfiguration( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) + throws BindException { + switch (driverClientConfigurationProto.getRuntimeCase()) { + case LOCAL_RUNTIME: + return getLocalRuntimeConfiguration(driverClientConfigurationProto); + case YARN_RUNTIME: + return getYarnRuntimeConfiguration(driverClientConfigurationProto); + default: + throw new IllegalArgumentException("Unsupported runtime " + driverClientConfigurationProto.getRuntimeCase()); + } + } + + private static Configuration getLocalRuntimeConfiguration( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) + throws BindException { + LOG.log(Level.FINE, "JavaBridge: Running on the local runtime"); + return LocalRuntimeConfiguration.CONF + .build(); + } + + private static Configuration getYarnRuntimeConfiguration( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) + throws BindException { + LOG.log(Level.FINE, "JavaBridge: Running on YARN"); + return YarnClientConfiguration.CONF.build(); + } + + private static Configuration getDriverServiceConfiguration( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) { + // Set required parameters + ConfigurationModule driverServiceConfigurationModule = DriverServiceConfiguration.CONF + .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class) + .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND, + driverClientConfigurationProto.getDriverClientLaunchCommand()) + .set(DriverConfiguration.DRIVER_IDENTIFIER, driverClientConfigurationProto.getJobid()); + + // Set file dependencies + final List<String> localLibraries = new ArrayList<>(); + localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); + if (driverClientConfigurationProto.getLocalLibrariesCount() > 0) { + localLibraries.addAll(driverClientConfigurationProto.getLocalLibrariesList()); + } + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries); + if (driverClientConfigurationProto.getGlobalLibrariesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, + driverClientConfigurationProto.getGlobalLibrariesList()); + } + if (driverClientConfigurationProto.getLocalFilesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.LOCAL_FILES, + driverClientConfigurationProto.getLocalFilesList()); + } + if (driverClientConfigurationProto.getGlobalFilesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.GLOBAL_FILES, + driverClientConfigurationProto.getGlobalFilesList()); + } + // Setup driver resources + if (driverClientConfigurationProto.getCpuCores() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_CPU_CORES, driverClientConfigurationProto.getCpuCores()); + } + if (driverClientConfigurationProto.getMemoryMb() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_MEMORY, driverClientConfigurationProto.getMemoryMb()); + } + + // Setup handlers + final Set<ClientProtocol.DriverClientConfiguration.Handlers> handlerLabelSet = new HashSet<>(); + handlerLabelSet.addAll(driverClientConfigurationProto.getHandlerList()); + if (!handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.START.START)) { + throw new IllegalArgumentException("Start handler required"); + } else { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class) + .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_COMPLETED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_FAILED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_ACTIVE)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_CLOSED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_FAILED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_MESSAGE)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_RUNNING)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_COMPLETED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_FAILED)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_MESSAGE)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_MESSAGE)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class); + } + if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE_WITH_MESSAGE)) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class); + } + + return setTcpPortRange(driverClientConfigurationProto, driverServiceConfigurationModule.build()); + } + + private static Configuration setTcpPortRange( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto, + final Configuration driverServiceConfiguration) { + JavaConfigurationBuilder configurationModuleBuilder = + Tang.Factory.getTang().newConfigurationBuilder(driverServiceConfiguration) + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class); + // Setup TCP constraints + if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeBegin.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin())); + } + if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeCount.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount())); + } + if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeCount.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount())); + } + return configurationModuleBuilder.build(); + } + + public static LauncherStatus submit( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto, + final Configuration driverClientConfiguration) + throws InjectionException, IOException { + ClientProtocol.DriverClientConfiguration.Builder builder = + ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto); + final File driverClientConfigurationFile = new File("driverclient.conf"); + try { + // Write driver client configuration to a file + final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration); + final ConfigurationSerializer configurationSerializer = + driverClientInjector.getInstance(ConfigurationSerializer.class); + configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile); + + // Get runtime injector and piece together the launch command based on its classpath info + final Configuration runtimeConfiguration = getRuntimeConfiguration(driverClientConfigurationProto); + // Resolve OS Runtime Path Provider + final Configuration runtimeOSConfiguration = Configurations.merge( + Tang.Factory.getTang().newConfigurationBuilder() + .bind(RuntimePathProvider.class, + OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class) + .build(), + runtimeConfiguration); + final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration); + final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class); + final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class); + final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class); + final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null) + .setConfigurationFilePaths( + Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" + + driverClientConfigurationFile.getName())) + .setJavaPath(runtimePathProvider.getPath()) + .setClassPath(classpathProvider.getEvaluatorClasspath()) + .build(); + final String cmd = StringUtils.join(launchCommand, ' '); + builder.setDriverClientLaunchCommand(cmd); + builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath()); + + + + // Configure driver service and launch the job + final Configuration driverServiceConfiguration = getDriverServiceConfiguration(builder.build()); + return DriverLauncher.getLauncher(runtimeOSConfiguration).run(driverServiceConfiguration); + } finally { + driverClientConfigurationFile.delete(); + } + } + + /** + * Main method that launches the REEF job. + * + * @param args command line parameters. + */ + public static void main(final String[] args) { + try { + if (args.length != 1) { + LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + + " accepts single argument referencing a file that contains a client protocol buffer driver configuration"); + } + final String content; + try { + content = new String(Files.readAllBytes(Paths.get(args[0]))); + } catch (IOException e) { + throw new RuntimeException(e); + } + final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder = + ClientProtocol.DriverClientConfiguration.newBuilder(); + JsonFormat.parser() + .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry()) + .merge(content, driverClientConfigurationProtoBuilder); + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto = + driverClientConfigurationProtoBuilder.build(); + + final Configuration runtimeConfig = getRuntimeConfiguration(driverClientConfigurationProto); + final Configuration driverConfig = getDriverServiceConfiguration(driverClientConfigurationProto); + DriverLauncher.getLauncher(runtimeConfig).run(driverConfig); + LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid()); + } catch (final BindException | InjectionException | IOException ex) { + LOG.log(Level.SEVERE, "Job configuration error", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java new file mode 100644 index 0000000..612f00d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/IDriverService.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.service; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +/** + * Interface implemented by a Driver Service. + */ +public interface IDriverService extends DriverIdlenessSource { + + /** + * Handle start time event. + * @param startTime event + */ + void startHandler(final StartTime startTime); + + /** + * Handle stop event. + * @param stopTime event + */ + void stopHandler(final StopTime stopTime); + + /** + * Handle allocated evaluator event. + * @param eval allocated + */ + void allocatedEvaluatorHandler(final AllocatedEvaluator eval); + + /** + * Handle completed evaluator event. + * @param eval that completed + */ + void completedEvaluatorHandler(final CompletedEvaluator eval); + + /** + * Handle failed evaluator event. + * @param eval that failed + */ + void failedEvaluatorHandler(final FailedEvaluator eval); + + /** + * Handle active context. + * @param context activated + */ + void activeContextHandler(final ActiveContext context); + + /** + * Handle closed context event. + * @param context that closed + */ + void closedContextHandler(final ClosedContext context); + + /** + * Handle context message event. + * @param message sent by context + */ + void contextMessageHandler(final ContextMessage message); + + /** + * Handled failed context event. + * @param context that failed + */ + void failedContextHandler(final FailedContext context); + + /** + * Handle running task event. + * @param task that is now running + */ + void runningTaskHandler(final RunningTask task); + + /** + * Handle failed task event. + * @param task that failed + */ + void failedTaskHandler(final FailedTask task); + + /** + * Handle completed task event. + * @param task that completed + */ + void completedTaskHandler(final CompletedTask task); + + /** + * Handle suspended task event. + * @param task that is suspended + */ + void suspendedTaskHandler(final SuspendedTask task); + + /** + * Handle task message event. + * @param message sent by task + */ + void taskMessageHandler(final TaskMessage message); + + /** + * Handle client message event. + * @param message sent by client + */ + void clientMessageHandler(final byte[] message); + + /** + * Handle client close event. + */ + void clientCloseHandler(); + + /** + * Handle client close event with message. + * @param message sent by client + */ + void clientCloseWithMessageHandler(final byte[] message); +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java new file mode 100644 index 0000000..7c7de47 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/RuntimeNames.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.service; + +import org.apache.reef.annotations.audience.Private; + +/** + * Runtime names supported by the bridge. + */ +@Private +public final class RuntimeNames { + public static final String LOCAL = "local"; + + public static final String YARN = "yarn"; + + public static final String AZBATCH = "azbatch"; + + private RuntimeNames() {} +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java new file mode 100644 index 0000000..3f7a131 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/GRPCDriverService.java @@ -0,0 +1,706 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.service.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.reef.bridge.service.DriverClientException; +import org.apache.reef.bridge.service.IDriverService; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.context.EvaluatorContext; +import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl; +import org.apache.reef.runtime.common.driver.idle.IdleMessage; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.util.OSUtils; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * GRPC DriverBridgeService that interacts with higher-level languages. + */ +public final class GRPCDriverService implements IDriverService { + private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName()); + + private Server server; + + private Process driverProcess; + + private DriverClientGrpc.DriverClientBlockingStub clientStub; + + private final Clock clock; + + private final EvaluatorRequestor evaluatorRequestor; + + private final JVMProcessFactory jvmProcessFactory; + + private final CLRProcessFactory clrProcessFactory; + + private final TcpPortProvider tcpPortProvider; + + private final String driverClientCommand; + + private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>(); + + private final Map<String, ActiveContext> activeContextMap = new HashMap<>(); + + private final Map<String, RunningTask> runningTaskMap = new HashMap<>(); + + private boolean stopped = false; + + @Inject + private GRPCDriverService( + final Clock clock, + final EvaluatorRequestor evaluatorRequestor, + final JVMProcessFactory jvmProcessFactory, + final CLRProcessFactory clrProcessFactory, + final TcpPortProvider tcpPortProvider, + @Parameter(DriverClientCommand.class) final String driverClientCommand) { + this.clock = clock; + this.jvmProcessFactory = jvmProcessFactory; + this.clrProcessFactory = clrProcessFactory; + this.evaluatorRequestor = evaluatorRequestor; + this.driverClientCommand = driverClientCommand; + this.tcpPortProvider = tcpPortProvider; + } + + private void start() throws IOException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server = ServerBuilder.forPort(port) + .addService(new DriverBridgeServiceImpl()) + .build() + .start(); + LOG.info("Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server == null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } else { + final String cmd = this.driverClientCommand + " " + this.server.getPort(); + final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd; + final String cmdStd = cmdOs + " 1> driverclient.stdout 2> driverclient.stderr"; + this.driverProcess = Runtime.getRuntime().exec(cmdStd); + } + } + + private void stop() { + stop(null); + } + + private void stop(final Throwable t) { + if (!stopped) { + try { + if (server != null) { + this.server.shutdown(); + this.server = null; + } + if (this.driverProcess != null) { + this.driverProcess.destroy(); + this.driverProcess = null; + } + if (t != null) { + clock.stop(t); + } else { + clock.stop(); + } + } finally { + stopped = true; + } + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Determines if the driver process is still alive by + * testing for its exit value, which throws {@link IllegalThreadStateException} + * if process is still running. + * @return true if driver process is alive, false otherwise + */ + private boolean driverProcessIsAlive() { + if (this.driverProcess != null) { + try { + this.driverProcess.exitValue(); + } catch (IllegalThreadStateException e) { + return true; + } + } + return false; + } + + private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) { + if (descriptor == null) { + return null; + } else { + return EvaluatorDescriptorInfo.newBuilder() + .setCores(descriptor.getNumberOfCores()) + .setMemory(descriptor.getMemory()) + .setRuntimeName(descriptor.getRuntimeName()) + .build(); + } + } + + @Override + public IdleMessage getIdleStatus() { + final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(null); + return new IdleMessage( + "Java Bridge DriverService", + idleStatus.getReason(), + idleStatus.getIsIdle()); + } + + @Override + public void startHandler(final StartTime startTime) { + try { + start(); + synchronized (this) { + // wait for driver client process to register + while (this.clientStub == null && driverProcessIsAlive()) { + this.wait(1000); // a second + } + if (this.clientStub != null) { + this.clientStub.startHandler( + StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build()); + } else { + stop(new IllegalStateException("Unable to start driver client")); + } + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + stop(); + } + } + + @Override + public void stopHandler(final StopTime stopTime) { + synchronized (this) { + try { + if (clientStub != null) { + this.clientStub.stopHandler( + StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build()); + } + } finally { + stop(); + } + } + + } + + @Override + public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.put(eval.getId(), eval); + this.clientStub.allocatedEvaluatorHandler( + EvaluatorInfo.newBuilder() + .setEvaluatorId(eval.getId()) + .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void completedEvaluatorHandler(final CompletedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.completedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build()); + } + } + + @Override + public void failedEvaluatorHandler(final FailedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.failedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build()); + } + } + + @Override + public void activeContextHandler(final ActiveContext context) { + synchronized (this) { + this.activeContextMap.put(context.getId(), context); + this.clientStub.activeContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentId().isPresent() ? + context.getParentId().get() : null) + .build()); + } + } + + @Override + public void closedContextHandler(final ClosedContext context) { + synchronized (this) { + this.activeContextMap.remove(context.getId()); + this.clientStub.closedContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentContext().getId()) + .build()); + } + } + + @Override + public void failedContextHandler(final FailedContext context) { + synchronized (this) { + this.activeContextMap.remove(context.getId()); + this.clientStub.closedContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentContext().isPresent() ? + context.getParentContext().get().getId() : null) + .build()); + } + } + + @Override + public void contextMessageHandler(final ContextMessage message) { + synchronized (this) { + this.clientStub.contextMessageHandler( + ContextMessageInfo.newBuilder() + .setContextId(message.getId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void runningTaskHandler(final RunningTask task) { + synchronized (this) { + this.runningTaskMap.put(task.getId(), task); + this.clientStub.runningTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContextId(task.getActiveContext().getId()) + .build()); + } + } + + @Override + public void failedTaskHandler(final FailedTask task) { + synchronized (this) { + this.runningTaskMap.remove(task.getId()); + this.clientStub.failedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContextId( + task.getActiveContext().isPresent() ? + task.getActiveContext().get().getId() : null) + .build()); + } + } + + @Override + public void completedTaskHandler(final CompletedTask task) { + synchronized (this) { + this.runningTaskMap.remove(task.getId()); + this.clientStub.completedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContextId(task.getActiveContext().getId()) + .build()); + } + } + + @Override + public void suspendedTaskHandler(final SuspendedTask task) { + synchronized (this) { + this.runningTaskMap.remove(task.getId()); + this.clientStub.suspendedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContextId(task.getActiveContext().getId()) + .build()); + } + } + + @Override + public void taskMessageHandler(final TaskMessage message) { + synchronized (this) { + this.clientStub.taskMessageHandler( + TaskMessageInfo.newBuilder() + .setTaskId(message.getId()) + .setContextId(message.getContextId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void clientMessageHandler(final byte[] message) { + synchronized (this) { + this.clientStub.clientMessageHandler( + ClientMessageInfo.newBuilder() + .setPayload(ByteString.copyFrom(message)) + .build()); + } + } + + @Override + public void clientCloseHandler() { + synchronized (this) { + this.clientStub.clientCloseHandler( + Void.newBuilder().build()); + } + } + + @Override + public void clientCloseWithMessageHandler(final byte[] message) { + synchronized (this) { + this.clientStub.clientCloseWithMessageHandler( + ClientMessageInfo.newBuilder() + .setPayload(ByteString.copyFrom(message)) + .build()); + } + } + + private final class DriverBridgeServiceImpl + extends DriverServiceGrpc.DriverServiceImplBase { + + @Override + public void registerDriverClient( + final DriverClientRegistration request, + final StreamObserver<Void> responseObserver) { + try { + final ManagedChannel channel = ManagedChannelBuilder + .forAddress(request.getHost(), request.getPort()) + .usePlaintext(true) + .build(); + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub = DriverClientGrpc.newBlockingStub(channel); + GRPCDriverService.this.notifyAll(); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void requestResources( + final ResourceRequest request, + final StreamObserver<Void> responseObserver) { + try { + synchronized (GRPCDriverService.this) { + EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest(); + requestBuilder.setNumber(request.getResourceCount()); + requestBuilder.setNumberOfCores(request.getCores()); + requestBuilder.setMemory(request.getMemorySize()); + requestBuilder.setRelaxLocality(request.getRelaxLocality()); + requestBuilder.setRuntimeName(request.getRuntimeName()); + if (request.getNodeNameListCount() > 0) { + requestBuilder.addNodeNames(request.getNodeNameListList()); + } + if (request.getRackNameListCount() > 0) { + for (final String rackName : request.getRackNameListList()) { + requestBuilder.addRackName(rackName); + } + } + GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build()); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void shutdown( + final ShutdownRequest request, + final StreamObserver<Void> responseObserver) { + try { + synchronized (GRPCDriverService.this) { + if (request.getException() != null) { + GRPCDriverService.this.clock.stop( + new DriverClientException(request.getException().getMessage())); + } else { + GRPCDriverService.this.clock.stop(); + } + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void setAlarm( + final AlarmRequest request, + final StreamObserver<Void> responseObserver) { + try { + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub.alarmTrigger( + AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build()); + } + } + }); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void allocatedEvaluatorOp( + final AllocatedEvaluatorRequest request, + final StreamObserver<Void> responseObserver) { + try { + if (request.getEvaluatorConfiguration() == null) { + responseObserver.onError( + new IllegalArgumentException("Evaluator configuration required")); + } else if (request.getContextConfiguration() == null && request.getTaskConfiguration() == null) { + responseObserver.onError( + new IllegalArgumentException("Context and/or Task configuration required")); + } else { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(request.getEvaluatorId())) { + responseObserver.onError( + new IllegalArgumentException("Unknown allocated evaluator " + request.getEvaluatorId())); + } + final AllocatedEvaluator evaluator = + GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId()); + if (request.getCloseEvaluator()) { + evaluator.close(); + } else { + if (request.getAddFilesCount() > 0) { + for (final String file : request.getAddFilesList()) { + evaluator.addFile(new File(file)); + } + } + if (request.getAddLibrariesCount() > 0) { + for (final String library : request.getAddLibrariesList()) { + evaluator.addLibrary(new File(library)); + } + } + if (request.getSetProcess() != null) { + final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest = + request.getSetProcess(); + switch (evaluator.getEvaluatorDescriptor().getProcess().getType()) { + case JVM: + setJVMProcess(evaluator, processRequest); + break; + case CLR: + setCLRProcess(evaluator, processRequest); + break; + default: + throw new RuntimeException("Unknown evaluator process type"); + } + } + if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) { + // submit context and task + ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask( + request.getEvaluatorConfiguration(), + request.getContextConfiguration(), + request.getTaskConfiguration()); + } else if (request.getContextConfiguration() != null) { + // submit context + ((AllocatedEvaluatorImpl) evaluator).submitContext( + request.getEvaluatorConfiguration(), + request.getContextConfiguration()); + } else if (request.getTaskConfiguration() != null) { + // submit task + ((AllocatedEvaluatorImpl) evaluator).submitTask( + request.getEvaluatorConfiguration(), + request.getTaskConfiguration()); + } else { + throw new RuntimeException("Missing check for required evaluator configurations"); + } + } + } + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void activeContextOp( + final ActiveContextRequest request, + final StreamObserver<Void> responseObserver) { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) { + responseObserver.onError( + new IllegalArgumentException("Context does not exist with id " + request.getContextId())); + } else if (request.getNewContextRequest() != null && request.getNewTaskRequest() != null) { + responseObserver.onError( + new IllegalArgumentException("Context request can only contain one of a context or task configuration")); + + } + final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId()); + if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) { + if (request.getCloseContext()) { + try { + context.close(); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(new IllegalArgumentException("Close context operation not set to true")); + } + } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.MESSAGE) { + if (request.getMessage() != null) { + try { + context.sendMessage(request.getMessage().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(new IllegalArgumentException("Empty message on operation send message")); + } + } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_CONTEXT_REQUEST) { + try { + ((EvaluatorContext) context).submitContext(request.getNewContextRequest()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_TASK_REQUEST) { + try { + ((EvaluatorContext) context).submitTask(request.getNewTaskRequest()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + } + } + + @Override + public void runningTaskOp( + final RunningTaskRequest request, + final StreamObserver<Void> responseObserver) { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) { + responseObserver.onError( + new IllegalArgumentException("Task does not exist with id " + request.getTaskId())); + } + try { + final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId()); + if (request.getCloseTask()) { + if (request.getMessage() != null) { + task.close(request.getMessage().toByteArray()); + } else { + task.close(); + } + } else if (request.getMessage() != null) { + task.send(request.getMessage().toByteArray()); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + } + + private void setCLRProcess( + final AllocatedEvaluator evaluator, + final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) { + final CLRProcess process = GRPCDriverService.this.clrProcessFactory.newEvaluatorProcess(); + if (processRequest.getMemoryMb() > 0) { + process.setMemory(processRequest.getMemoryMb()); + } + if (processRequest.getConfigurationFileName() != null) { + process.setConfigurationFileName(processRequest.getConfigurationFileName()); + } + if (processRequest.getStandardOut() != null) { + process.setStandardOut(processRequest.getStandardOut()); + } + if (processRequest.getStandardErr() != null) { + process.setStandardErr(processRequest.getStandardErr()); + } + evaluator.setProcess(process); + } + + private void setJVMProcess( + final AllocatedEvaluator evaluator, + final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) { + final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess(); + if (processRequest.getMemoryMb() > 0) { + process.setMemory(processRequest.getMemoryMb()); + } + if (processRequest.getConfigurationFileName() != null) { + process.setConfigurationFileName(processRequest.getConfigurationFileName()); + } + if (processRequest.getStandardOut() != null) { + process.setStandardOut(processRequest.getStandardOut()); + } + if (processRequest.getStandardErr() != null) { + process.setStandardErr(processRequest.getStandardErr()); + } + if (processRequest.getOptionsCount() > 0) { + for (final String option : processRequest.getOptionsList()) { + process.addOption(option); + } + } + evaluator.setProcess(process); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java new file mode 100644 index 0000000..a94328d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/grpc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * GRPC implementation for driver bridge service. + */ +package org.apache.reef.bridge.service.grpc; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java new file mode 100644 index 0000000..25a8918 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * The Java-side of the CLR/Java bridge interop via gRPC/Protocol Buffers. + */ +package org.apache.reef.bridge.service; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java new file mode 100644 index 0000000..255f60d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/DriverClientCommand.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.service.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * What command to use when starting bridge process. + */ +@NamedParameter(doc = "The command to launch bridge driver process", + short_name = "command") +public final class DriverClientCommand implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java new file mode 100644 index 0000000..6a3b956 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Driver bridge service parameters. + */ +package org.apache.reef.bridge.service.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java index f52cc7f..d223717 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java @@ -29,7 +29,7 @@ import org.apache.reef.driver.evaluator.EvaluatorProcess; */ @Private @DriverSide -final class EvaluatorDescriptorImpl implements EvaluatorDescriptor { +public final class EvaluatorDescriptorImpl implements EvaluatorDescriptor { private final NodeDescriptor nodeDescriptor; private final int megaBytes; @@ -37,7 +37,7 @@ final class EvaluatorDescriptorImpl implements EvaluatorDescriptor { private EvaluatorProcess process; private final String runtimeName; - EvaluatorDescriptorImpl(final NodeDescriptor nodeDescriptor, + public EvaluatorDescriptorImpl(final NodeDescriptor nodeDescriptor, final int megaBytes, final int numberOfCores, final EvaluatorProcess process, http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java index 5f81a5d..bd7784b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java @@ -108,7 +108,7 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder { if (classPath != null && !classPath.isEmpty()) { add("-classpath"); - add(classPath); + add("\"" + classPath + "\""); } propagateProperties(this, true, "proc_reef"); http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java index b6a7aa0..e4e7fb6 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java @@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter; /** * First tcp port number to try. */ -@NamedParameter(doc = "First tcp port number to try", default_value = TcpPortRangeBegin.DEFAULT_VALUE) +@NamedParameter(doc = "First tcp port number to try", + short_name = "tcp_port_range_begin", default_value = TcpPortRangeBegin.DEFAULT_VALUE) public final class TcpPortRangeBegin implements Name<Integer> { public static final String DEFAULT_VALUE = "10000"; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java index ee5879d..23b65fb 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java @@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter; /** * Number of tcp ports in the range. */ -@NamedParameter(doc = "Number of tcp ports in the range", default_value = TcpPortRangeCount.DEFAULT_VALUE) +@NamedParameter(doc = "Number of tcp ports in the range", + short_name = "tcp_port_range_count", default_value = TcpPortRangeCount.DEFAULT_VALUE) public final class TcpPortRangeCount implements Name<Integer> { public static final String DEFAULT_VALUE = "10000"; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java index 60cedea..20be605 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java @@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter; /** * Max number tries for port numbers. */ -@NamedParameter(doc = "Max number tries for port numbers", default_value = TcpPortRangeTryCount.DEFAULT_VALUE) +@NamedParameter(doc = "Max number tries for port numbers", + short_name = "tcp_port_range_try_count", default_value = TcpPortRangeTryCount.DEFAULT_VALUE) public final class TcpPortRangeTryCount implements Name<Integer> { public static final String DEFAULT_VALUE = "1000"; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8d58a93..bd35fbd 100644 --- a/pom.xml +++ b/pom.xml @@ -289,6 +289,8 @@ under the License. <exclude>REEF_STANDALONE_RUNTIME/**</exclude> <!-- Error logs --> <exclude>**/*.log</exclude> + <!-- Shading configuration --> + <exclude>**/dependency-reduced-pom.xml</exclude> <!-- The Visual Studio and Nuget build files --> <exclude>**/.vs/**</exclude> <exclude>**/*.sln*</exclude> @@ -298,6 +300,7 @@ under the License. <exclude>**/*.sdf*</exclude> <exclude>**/*.snk</exclude> <exclude>**/*.opendb</exclude> + <exclude>**/*.resx</exclude> <!-- The below are auto generated during the .Net build --> <exclude>**/bin/**</exclude> <exclude>**/obj/**</exclude> @@ -769,6 +772,7 @@ under the License. <module>lang/java/reef-applications</module> <module>lang/java/reef-bridge-client</module> <module>lang/java/reef-bridge-java</module> + <module>lang/java/reef-bridge-proto-java</module> <module>lang/java/reef-checkpoint</module> <module>lang/java/reef-common</module> <module>lang/java/reef-examples</module>