http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala new file mode 100644 index 0000000..447b034 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.worker + +import java.io.File +import java.lang.management.ManagementFactory +import java.net.URL +import java.util.concurrent.{Executors, TimeUnit} + +import akka.actor.SupervisorStrategy.Stop +import akka.actor._ +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} +import org.apache.gearpump.cluster.AppMasterToWorker._ +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig} +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.WorkerToAppMaster._ +import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher +import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} +import org.apache.gearpump.jarstore.JarStoreClient +import org.apache.gearpump.metrics.Metrics.ReportMetrics +import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} +import org.apache.gearpump.util.ActorSystemBooter.Daemon +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig +import org.apache.gearpump.util.{TimeOutScheduler, _} +import org.slf4j.Logger + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + +/** + * Worker is used to track the resource on single machine, it is like + * the node manager of YARN. + * + * @param masterProxy masterProxy is used to resolve the master + */ +private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler { + private val systemConfig: Config = context.system.settings.config + + private val address = ActorUtil.getFullPath(context.system, self.path) + private var resource = Resource.empty + private var allocatedResources = Map[ActorRef, Resource]() + private var executorsInfo = Map[ActorRef, ExecutorSlots]() + private var id: WorkerId = WorkerId.unspecified + private val createdTime = System.currentTimeMillis() + private var masterInfo: MasterInfo = null + private var executorNameToActor = Map.empty[String, ActorRef] + private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher() + private val jarStoreClient = new JarStoreClient(systemConfig, context.system) + + private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + private val resourceUpdateTimeoutMs = 30000 // Milliseconds + + private var totalSlots: Int = 0 + + val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) + var historyMetricsService: Option[ActorRef] = None + + override def receive: Receive = null + var LOG: Logger = LogUtil.getLogger(getClass) + + def service: Receive = + appMasterMsgHandler orElse + clientMessageHandler orElse + metricsService orElse + terminationWatch(masterInfo.master) orElse + ActorUtil.defaultMsgHandler(self) + + def metricsService: Receive = { + case query: QueryHistoryMetrics => + if (historyMetricsService.isEmpty) { + // Returns empty metrics so that we don't hang the UI + sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) + } else { + historyMetricsService.get forward query + } + } + + private var metricsInitialized = false + + val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) + + private def initializeMetrics(): Unit = { + // Registers jvm metrics + val metricsSetName = "worker" + WorkerId.render(id) + Metrics(context.system).register(new JvmMetricsSet(metricsSetName)) + + historyMetricsService = if (metricsEnabled) { + val historyMetricsService = { + context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig))) + } + + val metricsReportService = context.actorOf(Props( + new MetricsReporterService(Metrics(context.system)))) + historyMetricsService.tell(ReportMetrics, metricsReportService) + Some(historyMetricsService) + } else { + None + } + } + + def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = { + + // If master get disconnected, the WorkerRegistered may be triggered multiple times. + case WorkerRegistered(id, masterInfo) => + this.id = id + + // Adds the flag check, so that we don't re-initialize the metrics when worker re-register + // itself. + if (!metricsInitialized) { + initializeMetrics() + metricsInitialized = true + } + + this.masterInfo = masterInfo + timeoutTicker.cancel() + context.watch(masterInfo.master) + this.LOG = LogUtil.getLogger(getClass, worker = id) + LOG.info(s"Worker is registered. " + + s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....") + sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), + resourceUpdateTimeoutMs, updateResourceTimeOut()) + context.become(service) + } + + private def updateResourceTimeOut(): Unit = { + LOG.error(s"Update worker resource time out") + } + + def appMasterMsgHandler: Receive = { + case shutdown@ShutdownExecutor(appId, executorId, reason: String) => + val actorName = ActorUtil.actorNameForExecutor(appId, executorId) + val executorToStop = executorNameToActor.get(actorName) + if (executorToStop.isDefined) { + LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " + + s"due to: $reason") + executorToStop.get.forward(shutdown) + } else { + LOG.error(s"Cannot find executor $actorName, ignore this message") + sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId") + } + case launch: LaunchExecutor => + LOG.info(s"$launch") + if (resource < launch.resource) { + sender ! ExecutorLaunchRejected("There is no free resource on this machine") + } else { + val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId) + + val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, + jarStoreClient, executorProcLauncher)) + executorNameToActor += actorName -> executor + + resource = resource - launch.resource + allocatedResources = allocatedResources + (executor -> launch.resource) + + reportResourceToMaster() + executorsInfo += executor -> + ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots) + context.watch(executor) + } + case UpdateResourceFailed(reason, ex) => + LOG.error(reason) + context.stop(self) + case UpdateResourceSucceed => + LOG.info(s"Update resource succeed") + case GetWorkerData(workerId) => + val aliveFor = System.currentTimeMillis() - createdTime + val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath + val userDir = System.getProperty("user.dir") + sender ! WorkerData(WorkerSummary( + id, "active", + address, + aliveFor, + logDir, + executorsInfo.values.toArray, + totalSlots, + resource.slots, + userDir, + jvmName = ManagementFactory.getRuntimeMXBean().getName(), + resourceManagerContainerId = systemConfig.getString( + GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), + historyMetricsConfig = getHistoryMetricsConfig) + ) + case ChangeExecutorResource(appId, executorId, usedResource) => + for (executor <- executorActorRef(appId, executorId); + allocatedResource <- allocatedResources.get(executor)) { + + allocatedResources += executor -> usedResource + resource = resource + allocatedResource - usedResource + reportResourceToMaster() + + if (usedResource == Resource(0)) { + executorsInfo -= executor + allocatedResources -= executor + // stop executor if there is no resource binded to it. + LOG.info(s"Shutdown executor $executorId because the resource used is zero") + executor ! ShutdownExecutor(appId, executorId, + "Shutdown executor because the resource used is zero") + } + } + } + + private def reportResourceToMaster(): Unit = { + sendMsgWithTimeOutCallBack(masterInfo.master, + ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) + } + + private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = { + val actorName = ActorUtil.actorNameForExecutor(appId, executorId) + executorNameToActor.get(actorName) + } + + def clientMessageHandler: Receive = { + case QueryWorkerConfig(workerId) => + if (this.id == workerId) { + sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) + } else { + sender ! WorkerConfig(ConfigFactory.empty) + } + } + + private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = { + repeatActionUtil( + seconds = timeOutSeconds, + action = () => { + masterProxy ! RegisterWorker(workerId) + }, + onTimeout = () => { + LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " + + s"seconds, abort and kill the worker...") + self ! PoisonPill + }) + } + + def terminationWatch(master: ActorRef): Receive = { + case Terminated(actor) => + if (actor.compareTo(master) == 0) { + // Parent master is down, no point to keep worker anymore. Let's make suicide to free + // resources + LOG.info(s"Master cannot be contacted, find a new master ...") + context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30))) + } else if (ActorUtil.isChildActorPath(self, actor)) { + // One executor is down, + LOG.info(s"Executor is down ${getExecutorName(actor)}") + + val allocated = allocatedResources.get(actor) + if (allocated.isDefined) { + resource = resource + allocated.get + executorsInfo -= actor + allocatedResources = allocatedResources - actor + sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), + resourceUpdateTimeoutMs, updateResourceTimeOut()) + } + } + } + + private def getExecutorName(actorRef: ActorRef): Option[String] = { + executorNameToActor.find(_._2 == actorRef).map(_._1) + } + + private def getExecutorProcLauncher(): ExecutorProcessLauncher = { + val launcherClazz = Class.forName( + systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER)) + launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig) + .asInstanceOf[ExecutorProcessLauncher] + } + + import context.dispatcher + override def preStart(): Unit = { + LOG.info(s"RegisterNewWorker") + totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS) + this.resource = Resource(totalSlots) + masterProxy ! RegisterNewWorker + context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30))) + } + + private def registerTimeoutTicker(seconds: Int): Cancellable = { + repeatActionUtil(seconds, () => Unit, () => { + LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " + + s"abort and kill the worker...") + self ! PoisonPill + }) + } + + private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit) + : Cancellable = { + val cancelTimeout = context.system.scheduler.schedule(Duration.Zero, + Duration(2, TimeUnit.SECONDS))(action()) + val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout()) + new Cancellable { + def cancel(): Boolean = { + val result1 = cancelTimeout.cancel() + val result2 = cancelSuicide.cancel() + result1 && result2 + } + + def isCancelled: Boolean = { + cancelTimeout.isCancelled && cancelSuicide.isCancelled + } + } + } + + override def postStop(): Unit = { + LOG.info(s"Worker is going down....") + ioPool.shutdown() + context.system.terminate() + } +} + +private[cluster] object Worker { + + case class ExecutorResult(result: Try[Int]) + + class ExecutorWatcher( + launch: LaunchExecutor, + masterInfo: MasterInfo, + ioPool: ExecutionContext, + jarStoreClient: JarStoreClient, + procLauncher: ExecutorProcessLauncher) extends Actor { + import launch.{appId, executorId, resource} + + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId) + + val executorConfig: Config = { + val workerConfig = context.system.settings.config + + val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig => + Option(jvmConfig.executorAkkaConfig) + }.getOrElse(ConfigFactory.empty()) + + resolveExecutorConfig(workerConfig, submissionConfig) + } + + // For some config, worker has priority, for others, user Application submission config + // have priorities. + private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = { + val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME) + .withoutPath(GEARPUMP_CLUSTER_MASTERS) + .withoutPath(GEARPUMP_HOME) + .withoutPath(GEARPUMP_LOG_DAEMON_DIR) + .withoutPath(GEARPUMP_LOG_APPLICATION_DIR) + .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS) + // Falls back to workerConfig + .withFallback(workerConfig) + + // Minimum supported akka.scheduler.tick-duration on Windows is 10ms + val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION) + val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) { + LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms") + config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10)) + } else { + config + } + + // Excludes reference.conf, and JVM properties.. + ClusterConfig.filterOutDefaultConfig(updatedConf) + } + + implicit val executorService = ioPool + + private val executorHandler = { + val ctx = launch.executorJvmConfig + + if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) { + new ExecutorHandler { + val exitPromise = Promise[Int]() + val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise))) + + override def destroy(): Unit = { + context.stop(app) + } + override def exitValue: Future[Int] = { + exitPromise.future + } + } + } else { + createProcess(ctx) + } + } + + private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = { + + val process = Future { + val jarPath = ctx.jar.map { appJar => + val tempFile = File.createTempFile(appJar.name, ".jar") + jarStoreClient.copyToLocalFile(tempFile, appJar.filePath) + val file = new URL("file:" + tempFile) + file.getFile + } + + val configFile = { + val configFile = File.createTempFile("gearpump", ".conf") + ClusterConfig.saveConfig(executorConfig, configFile) + val file = new URL("file:" + configFile) + file.getFile + } + + val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++ + ctx.classPath.map(path => expandEnviroment(path)) ++ + jarPath.map(Array(_)).getOrElse(Array.empty[String]) + + val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR) + val logArgs = List( + s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}", + s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}", + s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}", + s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}") + val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile") + + val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}") + + // Remote debug executor process + val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM) + val remoteDebugConfig = if (remoteDebugFlag) { + val availablePort = Util.findFreePort().get + List( + "-Xdebug", + s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n", + s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort" + ) + } else { + List.empty[String] + } + + val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC) + val verboseGCConfig = if (verboseGCFlag) { + List( + s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log", + "-verbose:gc", + "-XX:+PrintGCDetails", + "-XX:+PrintGCDateStamps", + "-XX:+PrintTenuringDistribution", + "-XX:+PrintGCApplicationConcurrentTime", + "-XX:+PrintGCApplicationStoppedTime" + ) + } else { + List.empty[String] + } + + val ipv4 = List(s"-D${PREFER_IPV4}=true") + + val options = ctx.jvmArguments ++ username ++ + logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs + + val process = procLauncher.createProcess(appId, executorId, resource, executorConfig, + options, classPath, ctx.mainClass, ctx.arguments) + + ProcessInfo(process, jarPath, configFile) + } + + new ExecutorHandler { + + var destroyed = false + + override def destroy(): Unit = { + LOG.info(s"Destroy executor process ${ctx.mainClass}") + if (!destroyed) { + destroyed = true + process.foreach { info => + info.process.destroy() + info.jarPath.foreach(new File(_).delete()) + new File(info.configFile).delete() + } + } + } + + override def exitValue: Future[Int] = { + process.flatMap { info => + val exit = info.process.exitValue() + if (exit == 0) { + Future.successful(0) + } else { + Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " + + s"error summary: ${info.process.logger.error}")) + } + } + } + } + } + + private def expandEnviroment(path: String): String = { + // TODO: extend this to support more environment. + path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME)) + } + + override def preStart(): Unit = { + executorHandler.exitValue.onComplete { value => + procLauncher.cleanProcess(appId, executorId) + val result = ExecutorResult(value) + self ! result + } + } + + override def postStop(): Unit = { + executorHandler.destroy() + } + + // The folders are under ${GEARPUMP_HOME} + val daemonPathPattern = List("lib" + File.separator + "yarn") + + override def receive: Receive = { + case ShutdownExecutor(appId, executorId, reason: String) => + executorHandler.destroy() + sender ! ShutdownExecutorSucceed(appId, executorId) + context.stop(self) + case ExecutorResult(executorResult) => + executorResult match { + case Success(exit) => LOG.info("Executor exit normally with exit value " + exit) + case Failure(e) => LOG.error("Executor exit with errors", e) + } + context.stop(self) + } + + private def getFormatedTime(timestamp: Long): String = { + val datePattern = "yyyy-MM-dd-HH-mm" + val format = new java.text.SimpleDateFormat(datePattern) + format.format(timestamp) + } + + private def filterOutDaemonLib(classPath: Array[String]): Array[String] = { + classPath.filterNot(matchDaemonPattern(_)) + } + + private def matchDaemonPattern(path: String): Boolean = { + daemonPathPattern.exists(path.contains(_)) + } + } + + trait ExecutorHandler { + def destroy(): Unit + def exitValue: Future[Int] + } + + case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String) + + /** + * Starts the executor in the same JVM as worker. + */ + class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) + extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) { + private val exitCode = 0 + + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { + case ex: Throwable => + LOG.error(s"system $name stopped ", ex) + exit.failure(ex) + Stop + } + + override def postStop(): Unit = { + if (!exit.isCompleted) { + exit.success(exitCode) + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala new file mode 100644 index 0000000..0a22245 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster + +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.pattern.ask +import akka.testkit.TestActorRef +import com.typesafe.config.ConfigValueFactory +import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers +import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList +import org.apache.gearpump.cluster.master.Master +import org.apache.gearpump.cluster.worker.Worker +import org.apache.gearpump.util.Constants + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +class MiniCluster { + private val mockMasterIP = "127.0.0.1" + + implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG. + withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP))) + + val (mockMaster, worker) = { + val master = system.actorOf(Props(classOf[Master]), "master") + val worker = system.actorOf(Props(classOf[Worker], master), "worker") + + // Wait until worker register itself to master + waitUtilWorkerIsRegistered(master) + (master, worker) + } + + def launchActor(props: Props): TestActorRef[Actor] = { + TestActorRef(props) + } + + private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = { + while (!isWorkerRegistered(master)) {} + } + + private def isWorkerRegistered(master: ActorRef): Boolean = { + import scala.concurrent.duration._ + implicit val dispatcher = system.dispatcher + + implicit val futureTimeout = Constants.FUTURE_TIMEOUT + + val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]] + + // Waits until the worker is registered. + val workers = Await.result[WorkerList](workerListFuture, 15.seconds) + workers.workers.size > 0 + } + + def shutDown(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala new file mode 100644 index 0000000..f9b0762 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.{Actor, ActorRef, Props} +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} +import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager} +import org.apache.gearpump.cluster.master.AppManager._ +import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} +import org.apache.gearpump.cluster.{TestUtil, _} +import org.apache.gearpump.util.LogUtil +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.util.Success + +class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + var kvService: TestProbe = null + var haService: TestProbe = null + var appLauncher: TestProbe = null + var appManager: ActorRef = null + private val LOG = LogUtil.getLogger(getClass) + + override def config: Config = TestUtil.DEFAULT_CONFIG + + override def beforeEach(): Unit = { + startActorSystem() + kvService = TestProbe()(getActorSystem) + appLauncher = TestProbe()(getActorSystem) + + appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, + new DummyAppMasterLauncherFactory(appLauncher)))) + kvService.expectMsgType[GetKV] + kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty))) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "AppManager" should "handle AppMaster message correctly" in { + val appMaster = TestProbe()(getActorSystem) + val appId = 1 + + val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName")) + appMaster.send(appManager, register) + appMaster.expectMsgType[AppMasterRegistered] + + appMaster.send(appManager, ActivateAppMaster(appId)) + appMaster.expectMsgType[AppMasterActivated] + } + + "DataStoreService" should "support Put and Get" in { + val appMaster = TestProbe()(getActorSystem) + appMaster.send(appManager, SaveAppData(0, "key", 1)) + kvService.expectMsgType[PutKV] + kvService.reply(PutKVSuccess) + appMaster.expectMsg(AppDataSaved) + + appMaster.send(appManager, GetAppData(0, "key")) + kvService.expectMsgType[GetKV] + kvService.reply(GetKVSuccess("key", 1)) + appMaster.expectMsg(GetAppDataResult("key", 1)) + } + + "AppManager" should "support application submission and shutdown" in { + testClientSubmission(withRecover = false) + } + + "AppManager" should "support application submission and recover if appmaster dies" in { + LOG.info("=================testing recover==============") + testClientSubmission(withRecover = true) + } + + "AppManager" should "handle client message correctly" in { + val mockClient = TestProbe()(getActorSystem) + mockClient.send(appManager, ShutdownApplication(1)) + assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure) + + mockClient.send(appManager, ResolveAppId(1)) + assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) + + mockClient.send(appManager, AppMasterDataRequest(1)) + mockClient.expectMsg(AppMasterData(AppMasterNonExist)) + } + + "AppManager" should "reject the application submission if the app name already existed" in { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + val appId = 1 + + client.send(appManager, submit) + + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + appMaster.send(appManager, RegisterAppMaster(appMaster.ref, + AppMasterRuntimeInfo(appId, app.name))) + appMaster.expectMsgType[AppMasterRegistered] + + client.send(appManager, submit) + assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) + } + + def testClientSubmission(withRecover: Boolean): Unit = { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + val appId = 1 + + client.send(appManager, submit) + + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + appMaster.send(appManager, RegisterAppMaster(appMaster.ref, + AppMasterRuntimeInfo(appId, app.name))) + kvService.expectMsgType[PutKV] + appMaster.expectMsgType[AppMasterRegistered] + + client.send(appManager, ResolveAppId(appId)) + client.expectMsg(ResolveAppIdResult(Success(appMaster.ref))) + + client.send(appManager, AppMastersDataRequest) + client.expectMsgType[AppMastersData] + + client.send(appManager, AppMasterDataRequest(appId, false)) + client.expectMsgType[AppMasterData] + + if (!withRecover) { + client.send(appManager, ShutdownApplication(appId)) + client.expectMsg(ShutdownApplicationResult(Success(appId))) + } else { + // Do recovery + getActorSystem.stop(appMaster.ref) + kvService.expectMsgType[GetKV] + val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) + kvService.reply(GetKVSuccess(APP_STATE, appState)) + appLauncher.expectMsg(LauncherStarted(appId)) + } + } +} + +class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { + override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props = { + Props(new DummyAppMasterLauncher(test, appId)) + } +} + +class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { + test.ref ! LauncherStarted(appId) + + override def receive: Receive = { + case any: Any => test.ref forward any + } +} + +case class LauncherStarted(appId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala new file mode 100644 index 0000000..d3e739f --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.Props +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.gearpump.cluster.master.InMemoryKVService +import org.apache.gearpump.cluster.master.InMemoryKVService._ +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.concurrent.duration._ + +class InMemoryKVServiceSpec + extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + + override def beforeEach(): Unit = { + startActorSystem() + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + override def config: Config = TestUtil.MASTER_CONFIG + + "KVService" should "get, put, delete correctly" in { + val system = getActorSystem + val kvService = system.actorOf(Props(new InMemoryKVService())) + val group = "group" + + val client = TestProbe()(system) + + client.send(kvService, PutKV(group, "key", 1)) + client.expectMsg(PutKVSuccess) + + client.send(kvService, PutKV(group, "key", 2)) + client.expectMsg(PutKVSuccess) + + client.send(kvService, GetKV(group, "key")) + client.expectMsg(GetKVSuccess("key", 2)) + + client.send(kvService, DeleteKVGroup(group)) + + // After DeleteGroup, it no longer accept Get and Put message for this group. + client.send(kvService, GetKV(group, "key")) + client.expectNoMsg(3.seconds) + + client.send(kvService, PutKV(group, "key", 3)) + client.expectNoMsg(3.seconds) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala new file mode 100644 index 0000000..2166976 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import java.util.Properties + +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} +import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered +import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{Constants, LogUtil, Util} +import org.scalatest._ + +import scala.concurrent.Future +import scala.util.{Success, Try} + +class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + + private val LOG = LogUtil.getLogger(getClass) + + override def config: Config = TestUtil.DEFAULT_CONFIG + + override def beforeEach(): Unit = { + startActorSystem() + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "Worker" should "register worker address to master when started." in { + + val masterReceiver = createMockMaster() + + val tempTestConf = convertTestConf(getHost, getPort) + + val options = Array( + s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}", + s"-D${PREFER_IPV4}=true" + ) ++ getMasterListOption() + + val worker = Util.startProcess(options, + getContextClassPath, + getMainClassName(Worker), + Array.empty) + + try { + masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker) + + tempTestConf.delete() + } finally { + worker.destroy() + } + } + + "Master" should "accept worker RegisterNewWorker when started" in { + val worker = TestProbe()(getActorSystem) + + val host = "127.0.0.1" + val port = Util.findFreePort().get + + val properties = new Properties() + properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port") + properties.put(s"${GEARPUMP_HOSTNAME}", s"$host") + val masterConfig = ConfigFactory.parseProperties(properties) + .withFallback(TestUtil.MASTER_CONFIG) + Future { + Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString)) + } + + val masterProxy = getActorSystem.actorOf( + MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec") + + worker.send(masterProxy, RegisterNewWorker) + worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME) + } + + "Info" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) + masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) + } + + "Kill" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + Kill.main(masterConfig, Array("-appid", "0")) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0)) + masterReceiver.reply(ShutdownApplicationResult(Success(0))) + } + + "Replay" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + Replay.main(masterConfig, Array("-appid", "0")) + } + + masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME) + masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref))) + masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME) + masterReceiver.reply(ReplayApplicationResult(Success(0))) + } + + "Local" should "be started without exception" in { + val port = Util.findFreePort().get + val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port", + s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost", + s"-D${PREFER_IPV4}=true") + + val local = Util.startProcess(options, + getContextClassPath, + getMainClassName(Local), + Array.empty) + + def retry(times: Int)(fn: => Boolean): Boolean = { + + LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..") + + val result = fn + if (result || times <= 0) { + result + } else { + Thread.sleep(1000) + retry(times - 1)(fn) + } + } + + try { + assert(retry(10)(isPortUsed("127.0.0.1", port)), + "local is not started successfully, as port is not used " + port) + } finally { + local.destroy() + } + } + + "Gear" should "support app|info|kill|shell|replay" in { + + val commands = Array("app", "info", "kill", "shell", "replay") + + assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw") + + for (command <- commands) { + assert(Try(Gear.main(Array("-noexist"))).isFailure, + "pass unknown option, throw, command: " + command) + } + + assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ") + + val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist"))) + assert(tryThis.isFailure, "unknown command, throw") + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala new file mode 100644 index 0000000..b48fc2a --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.gearpump.cluster.TestUtil +import org.scalatest.{FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class MasterWatcherSpec extends FlatSpec with Matchers { + def config: Config = TestUtil.MASTER_CONFIG + + "MasterWatcher" should "kill itself when can not get a quorum" in { + val system = ActorSystem("ForMasterWatcher", config) + + val actorWatcher = TestProbe()(system) + + val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) + actorWatcher watch masterWatcher + actorWatcher.expectTerminated(masterWatcher, 5.seconds) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala new file mode 100644 index 0000000..8a3d7d1 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.scheduler + +import akka.actor.{ActorSystem, Props} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} +import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import org.apache.gearpump.cluster.worker.WorkerId +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.concurrent.duration._ + +class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpecLike with Matchers with BeforeAndAfterAll{ + + def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) + val appId = 0 + val workerId1: WorkerId = WorkerId(1, 0L) + val workerId2: WorkerId = WorkerId(2, 0L) + val mockAppMaster = TestProbe() + val mockWorker1 = TestProbe() + val mockWorker2 = TestProbe() + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "The scheduler" should { + "update resource only when the worker is registered" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)) + expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " + + s"registered into master")) + } + + "drop application's resource requests when the application is removed" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + mockAppMaster.expectNoMsg(5.seconds) + } + } + + def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = { + left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId)) + } + + "The resource request with higher priority" should { + "be handled first" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY) + val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY) + + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The resource request which delivered earlier" should { + "be handled first if the priorities are the same" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The PriorityScheduler" should { + "handle the resource request with different relaxation" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER) + val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER) + + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + val request3 = ResourceRequest( + Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + + expect = ResourceAllocated(Array( + ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), + ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + // We have to manually update the resource on each worker + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The PriorityScheduler" should { + "handle the resource request with different executor number" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + // By default, the request requires only one executor + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations2.allocations.length == 1) + assert(allocations2.allocations.head.resource == Resource(20)) + + val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations3.allocations.length == 3) + assert(allocations3.allocations.forall(_.resource == Resource(8))) + + // The total available resource can not satisfy the requirements with executor number + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations4.allocations.length == 2) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + + // When new resources are available, the remaining request will be satisfied + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) + val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations5.allocations.length == 1) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala new file mode 100644 index 0000000..e0233f8 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.worker + +import akka.actor.{ActorSystem, PoisonPill, Props} +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} +import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} +import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} +import org.scalatest._ + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { + override def config: Config = TestUtil.DEFAULT_CONFIG + + val appId = 1 + val workerId: WorkerId = WorkerId(1, 0L) + val executorId = 1 + var masterProxy: TestProbe = null + var mockMaster: TestProbe = null + var client: TestProbe = null + val workerSlots = 50 + + override def beforeEach(): Unit = { + startActorSystem() + mockMaster = TestProbe()(getActorSystem) + masterProxy = TestProbe()(getActorSystem) + client = TestProbe()(getActorSystem) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "The new started worker" should { + "kill itself if no response from Master after registering" in { + val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) + mockMaster watch worker + mockMaster.expectMsg(RegisterNewWorker) + mockMaster.expectTerminated(worker, 60.seconds) + } + } + + "Worker" should { + "init its resource from the gearpump config" in { + val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots"). + withFallback(TestUtil.DEFAULT_CONFIG) + val workerSystem = ActorSystem("WorkerSystem", config) + val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) + mockMaster watch worker + mockMaster.expectMsg(RegisterNewWorker) + + worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots))) + + worker.tell( + UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref) + mockMaster.expectTerminated(worker, 5.seconds) + workerSystem.terminate() + Await.result(workerSystem.whenTerminated, Duration.Inf) + } + } + + "Worker" should { + "update its remaining resource when launching and shutting down executors" in { + val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref)) + masterProxy.expectMsg(RegisterNewWorker) + + worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) + + val executorName = ActorUtil.actorNameForExecutor(appId, executorId) + // This is an actor path which the ActorSystemBooter will report back to, + // not needed in this test + val reportBack = "dummy" + val executionContext = ExecutorJVMConfig(Array.empty[String], + getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), + classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, + username = "user") + + // Test LaunchExecutor + worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), + mockMaster.ref) + mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine")) + + worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95))) + + worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98))) + + // Test terminationWatch + worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) + client.expectMsg(ShutdownExecutorSucceed(1, 1)) + + worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref) + client.expectMsg(ShutdownExecutorFailed( + s"Can not find executor ${executorId + 1} for app $appId")) + + mockMaster.ref ! PoisonPill + masterProxy.expectMsg(RegisterWorker(workerId)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala deleted file mode 100644 index 9e55be6..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster - -import akka.actor.ActorRef - -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Resource -import org.apache.gearpump.cluster.worker.WorkerId - -/** - * Cluster Bootup Flow - */ -object WorkerToMaster { - - /** When an worker is started, it sends RegisterNewWorker */ - case object RegisterNewWorker - - /** When worker lose connection with master, it tries to register itself again with old Id. */ - case class RegisterWorker(workerId: WorkerId) - - /** Worker is responsible to broadcast its current status to master */ - case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource) -} - -object MasterToWorker { - - /** Master confirm the reception of RegisterNewWorker or RegisterWorker */ - case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo) - - /** Worker have not received reply from master */ - case class UpdateResourceFailed(reason: String = null, ex: Throwable = null) - - /** Master is synced with worker on resource slots managed by current worker */ - case object UpdateResourceSucceed -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala deleted file mode 100644 index 9bde4d1..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.embedded - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorRef, ActorSystem, Props} -import com.typesafe.config.{Config, ConfigValueFactory} - -import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.master.{Master => MasterActor} -import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} -import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} -import org.apache.gearpump.util.{LogUtil, Util} - -/** - * Create a in-process cluster with single worker - */ -class EmbeddedCluster(inputConfig: Config) { - - private val workerCount: Int = 1 - private var _master: ActorRef = null - private var _system: ActorSystem = null - private var _config: Config = null - - private val LOG = LogUtil.getLogger(getClass) - - def start(): Unit = { - val port = Util.findFreePort().get - val akkaConf = getConfig(inputConfig, port) - _config = akkaConf - val system = ActorSystem(MASTER, akkaConf) - - val master = system.actorOf(Props[MasterActor], MASTER) - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - this._master = master - this._system = system - - LOG.info("=================================") - LOG.info("Local Cluster is started at: ") - LOG.info(s" 127.0.0.1:$port") - LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") - } - - private def getConfig(inputConfig: Config, port: Int): Config = { - val config = inputConfig. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). - withValue(GEARPUMP_CLUSTER_MASTERS, - ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). - withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, - ConfigValueFactory.fromAnyRef(true)). - withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)). - withValue("akka.actor.provider", - ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider")) - config - } - - def newClientContext: ClientContext = { - ClientContext(_config, _system, _master) - } - - def stop(): Unit = { - _system.stop(_master) - _system.terminate() - Await.result(_system.whenTerminated, Duration.Inf) - } -} - -object EmbeddedCluster { - def apply(): EmbeddedCluster = { - new EmbeddedCluster(ClusterConfig.master()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala deleted file mode 100644 index db71b7b..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, Props} -import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger - -import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.master.{Master => MasterActor} -import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.LogUtil.ProcessType -import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util} - -object Local extends AkkaApp with ArgumentsParser { - override def akkaConfig: Config = ClusterConfig.master() - - var LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = - Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), - "workernum" -> CLIOption[Int]("<how many workers to start>", required = false, - defaultValue = Some(2))) - - override val description = "Start a local cluster" - - def main(akkaConf: Config, args: Array[String]): Unit = { - - this.LOG = { - LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL) - LogUtil.getLogger(getClass) - } - - val config = parse(args) - if (null != config) { - local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf) - } - } - - def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = { - if (sameProcess) { - LOG.info("Starting local in same process") - System.setProperty("LOCAL", "true") - } - val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) - .asScala.flatMap(Util.parseHostList) - val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) - - if (masters.size != 1 && masters.head.host != local) { - LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " + - s"with ${Constants.GEARPUMP_HOSTNAME}") - } else { - - val hostPort = masters.head - implicit val system = ActorSystem(MASTER, akkaConf. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port)) - ) - - val master = system.actorOf(Props[MasterActor], MASTER) - val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER" - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - - Await.result(system.whenTerminated, Duration.Inf) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala deleted file mode 100644 index f1b9bdf..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor._ -import akka.cluster.ClusterEvent._ -import akka.cluster.ddata.DistributedData -import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings} -import akka.cluster.{Cluster, Member, MemberStatus} -import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger - -import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode} -import org.apache.gearpump.cluster.master.Master.MasterListUpdated -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.LogUtil.ProcessType -import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil} - -object Master extends AkkaApp with ArgumentsParser { - - private var LOG: Logger = LogUtil.getLogger(getClass) - - override def akkaConfig: Config = ClusterConfig.master() - - override val options: Array[(String, CLIOption[Any])] = - Array("ip" -> CLIOption[String]("<master ip address>", required = true), - "port" -> CLIOption("<master port>", required = true)) - - override val description = "Start Master daemon" - - def main(akkaConf: Config, args: Array[String]): Unit = { - - this.LOG = { - LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER) - LogUtil.getLogger(getClass) - } - - val config = parse(args) - master(config.getString("ip"), config.getInt("port"), akkaConf) - } - - private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = { - masters.exists { hostPort => - hostPort == s"$master:$port" - } - } - - private def master(ip: String, port: Int, akkaConf: Config): Unit = { - val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala - - if (!verifyMaster(ip, port, masters)) { - LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " + - s"gearpump.cluster.masters: ${masters.mkString(", ")}") - System.exit(-1) - } - - val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava - val quorum = masterList.size() / 2 + 1 - val masterConfig = akkaConf. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). - withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)). - withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)). - withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", - ConfigValueFactory.fromAnyRef(quorum)) - - LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}") - val system = ActorSystem(MASTER, masterConfig) - - val replicator = DistributedData(system).replicator - LOG.info(s"Replicator path: ${replicator.path}") - - // Starts singleton manager - val singletonManager = system.actorOf(ClusterSingletonManager.props( - singletonProps = Props(classOf[MasterWatcher], MASTER), - terminationMessage = PoisonPill, - settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER) - .withRole(MASTER)), - name = SINGLETON_MANAGER) - - // Start master proxy - val masterProxy = system.actorOf(ClusterSingletonProxy.props( - singletonManagerPath = s"/user/${SINGLETON_MANAGER}", - // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}". - // Master is created when there is a majority of machines started. - settings = ClusterSingletonProxySettings(system) - .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), - name = MASTER - ) - - LOG.info(s"master proxy is started at ${masterProxy.path}") - - val mainThread = Thread.currentThread() - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run(): Unit = { - if (!system.whenTerminated.isCompleted) { - LOG.info("Triggering shutdown hook....") - - system.stop(masterProxy) - val cluster = Cluster(system) - cluster.leave(cluster.selfAddress) - cluster.down(cluster.selfAddress) - try { - Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) - } catch { - case ex: Exception => // Ignore - } - system.terminate() - mainThread.join() - } - } - }) - - Await.result(system.whenTerminated, Duration.Inf) - } -} - -class MasterWatcher(role: String) extends Actor with ActorLogging { - import context.dispatcher - - val cluster = Cluster(context.system) - - val config = context.system.settings.config - val masters = config.getList("akka.cluster.seed-nodes") - val quorum = masters.size() / 2 + 1 - - val system = context.system - - // Sorts by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } - var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) - - def receive: Receive = null - - // Subscribes to MemberEvent, re-subscribe when restart - override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - context.become(waitForInit) - } - override def postStop(): Unit = { - cluster.unsubscribe(self) - } - - def matchingRole(member: Member): Boolean = member.hasRole(role) - - def waitForInit: Receive = { - case state: CurrentClusterState => { - membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m => - m.status == MemberStatus.Up && matchingRole(m)) - - if (membersByAge.size < quorum) { - membersByAge.iterator.mkString(",") - log.info(s"We cannot get a quorum, $quorum, " + - s"shutting down...${membersByAge.iterator.mkString(",")}") - context.become(waitForShutdown) - self ! MasterWatcher.Shutdown - } else { - val master = context.actorOf(Props(classOf[MasterActor]), MASTER) - notifyMasterMembersChange(master) - context.become(waitForClusterEvent(master)) - } - } - } - - def waitForClusterEvent(master: ActorRef): Receive = { - case MemberUp(m) if matchingRole(m) => { - membersByAge += m - notifyMasterMembersChange(master) - } - case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || - mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { - log.info(s"member removed ${mEvent.member}") - val m = mEvent.member - membersByAge -= m - if (membersByAge.size < quorum) { - log.info(s"We cannot get a quorum, $quorum, " + - s"shutting down...${membersByAge.iterator.mkString(",")}") - context.become(waitForShutdown) - self ! MasterWatcher.Shutdown - } else { - notifyMasterMembersChange(master) - } - } - } - - private def notifyMasterMembersChange(master: ActorRef): Unit = { - val masters = membersByAge.toList.map{ member => - MasterNode(member.address.host.getOrElse("Unknown-Host"), - member.address.port.getOrElse(0)) - } - master ! MasterListUpdated(masters) - } - - def waitForShutdown: Receive = { - case MasterWatcher.Shutdown => { - cluster.unsubscribe(self) - cluster.leave(cluster.selfAddress) - context.stop(self) - system.scheduler.scheduleOnce(Duration.Zero) { - try { - Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) - } catch { - case ex: Exception => // Ignore - } - system.terminate() - } - } - } -} - -object MasterWatcher { - object Shutdown -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala deleted file mode 100644 index 58a9dec..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, Props} -import org.slf4j.Logger - -import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.transport.HostPort -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} -import org.apache.gearpump.util.LogUtil.ProcessType -import org.apache.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to start a worker daemon process */ -object Worker extends AkkaApp with ArgumentsParser { - protected override def akkaConfig = ClusterConfig.worker() - - override val description = "Start a worker daemon" - - var LOG: Logger = LogUtil.getLogger(getClass) - - private def uuid = java.util.UUID.randomUUID.toString - - def main(akkaConf: Config, args: Array[String]): Unit = { - val id = uuid - - this.LOG = { - LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER) - // Delay creation of LOG instance to avoid creating an empty log file as we - // reset the log file name here - LogUtil.getLogger(getClass) - } - - val system = ActorSystem(id, akkaConf) - - val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address => - val hostAndPort = address.split(":") - HostPort(hostAndPort(0), hostAndPort(1).toInt) - } - - LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...") - val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}") - - system.actorOf(Props(classOf[WorkerActor], masterProxy), - classOf[WorkerActor].getSimpleName + id) - - Await.result(system.whenTerminated, Duration.Inf) - } -} \ No newline at end of file