http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala deleted file mode 100644 index 26208d0..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import java.io.IOException -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.concurrent.Future -import scala.reflect.ClassTag -import scala.util.Random - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties - -/** - * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request - */ -private[rpc] class Scheduler { - // mapping of worker IP address to worker instance - private val workers = mutable.Map[String, Schedulable]() - private val random = new Random() - - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * Pick a Worker according to the address and workload of the Worker - * Invoke the RPC and return Future result - */ - def sendRequestAsync[T: ClassTag]( - splitAddress: String, - request: Any): (Schedulable, Future[T]) = { - require(splitAddress != null) - if (workers.isEmpty) { - throw new IOException("No worker is available") - } - var worker = pickWorker(splitAddress) - - // check whether worker exceed max workload, if exceeded, pick next worker - val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores) - var numTry = workers.size - do { - if (worker.workload.get() >= maxWorkload) { - LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...") - worker = pickNextWorker(worker) - numTry = numTry - 1 - } else { - numTry = -1 - } - } while (numTry > 0) - if (numTry == 0) { - // tried so many times and still not able to find Worker - throw new WorkerTooBusyException( - s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload") - } - LOG.info(s"sending search request to worker ${worker.address}:${worker.port}") - val future = worker.ref.ask(request) - worker.workload.incrementAndGet() - (worker, future) - } - - private def pickWorker[T: ClassTag](splitAddress: String) = { - try { - workers(splitAddress) - } catch { - case e: NoSuchElementException => - // no local worker available, choose one worker randomly - pickRandomWorker() - } - } - - /** pick a worker randomly */ - private def pickRandomWorker() = { - val index = random.nextInt(workers.size) - workers.toSeq(index)._2 - } - - /** pick the next worker of the input worker in the [[Scheduler.workers]] */ - private def pickNextWorker(worker: Schedulable) = { - val index = workers.zipWithIndex.find { case ((address, w), index) => - w == worker - }.get._2 - if (index == workers.size - 1) { - workers.toSeq.head._2 - } else { - workers.toSeq(index + 1)._2 - } - } - - /** A new searcher is trying to register, add it to the map and connect to this searcher */ - def addWorker(address: String, schedulable: Schedulable): Unit = { - require(schedulable != null) - require(address.equals(schedulable.address)) - workers(address) = schedulable - } - - def removeWorker(address: String): Unit = { - workers.remove(address) - } - - def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator -} - -/** - * Represent a Worker which [[Scheduler]] can send - * Search request on it - * @param id Worker ID, a UUID string - * @param cores, number of cores in Worker - * @param ref RPC endpoint reference - * @param workload number of outstanding request sent to Worker - */ -private[rpc] class Schedulable( - val id: String, - val address: String, - val port: Int, - val cores: Int, - val ref: RpcEndpointRef, - var workload: AtomicInteger) { - def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = { - this(id, address, port, cores, ref, new AtomicInteger()) - } -} - -class WorkerTooBusyException(message: String) extends RuntimeException(message)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala deleted file mode 100644 index 0f2138a..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import java.io.IOException -import java.net.{BindException, InetAddress} - -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success} - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.rpc.netty.NettyRpcEnvFactory -import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher} -import org.apache.spark.util.ThreadUtils - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties - -@InterfaceAudience.Internal -object Worker { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - private val hostAddress = InetAddress.getLocalHost.getHostAddress - private var port: Int = _ - - def init(masterHostAddress: String, masterPort: Int): Unit = { - LOG.info(s"initializing worker...") - startService() - LOG.info(s"registering to master $masterHostAddress:$masterPort") - val workerId = registerToMaster(masterHostAddress, masterPort) - LOG.info(s"worker registered to master, workerId: $workerId") - } - - /** - * Start to listen on port [[CarbonProperties.getSearchWorkerPort]] - */ - private def startService(): Unit = { - new Thread(new Runnable { - override def run(): Unit = { - port = CarbonProperties.getSearchWorkerPort - val conf = new SparkConf() - var rpcEnv: RpcEnv = null - var exception: BindException = null - var numTry = 100 // we will try to create service at worse case 100 times - do { - try { - LOG.info(s"starting search-service on $hostAddress:$port") - val config = RpcEnvConfig( - conf, s"worker-$hostAddress", hostAddress, "", port, - new SecurityManager(conf), clientMode = false) - rpcEnv = new NettyRpcEnvFactory().create(config) - numTry = 0 - } catch { - case e: BindException => - // port is occupied, increase the port number and try again - exception = e - LOG.error(s"start search-service failed: ${e.getMessage}") - port = port + 1 - numTry = numTry - 1 - } - } while (numTry > 0) - if (rpcEnv == null) { - // we have tried many times, but still failed to find an available port - throw exception - } - val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv) - rpcEnv.setupEndpoint("search-service", searchEndpoint) - LOG.info("search-service started") - rpcEnv.awaitTermination() - } - }).start() - } - - private def registerToMaster(masterHostAddress: String, masterPort: Int): String = { - LOG.info(s"trying to register to master $masterHostAddress:$masterPort") - val conf = new SparkConf() - val config = RpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort, - new SecurityManager(conf), clientMode = true) - val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config) - - val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef( - RpcAddress(masterHostAddress, masterPort), "registry-service") - val cores = Runtime.getRuntime.availableProcessors() - - val request = RegisterWorkerRequest(hostAddress, port, cores) - val future = endPointRef.ask[RegisterWorkerResponse](request) - ThreadUtils.awaitResult(future, Duration.apply("10s")) - future.value match { - case Some(result) => - result match { - case Success(response) => - LOG.info("worker registered") - response.workerId - case Failure(throwable) => - LOG.error(s"worker failed to registered: $throwable") - throw new IOException(throwable.getMessage) - } - case None => - LOG.error("worker register timeout") - throw new ExecutionTimeoutException - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/scala/org/apache/spark/search/Registry.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala deleted file mode 100644 index 22e766d..0000000 --- a/store/search/src/main/scala/org/apache/spark/search/Registry.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.search - -import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} - -import org.apache.carbondata.common.logging.LogServiceFactory - -/** - * Registry service implementation. It adds worker to master. - */ -class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - override def onStart(): Unit = { - LOG.info("Registry Endpoint started") - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case req@RegisterWorkerRequest(_, _, _) => - val response = master.addWorker(req) - context.reply(response) - } - - override def onStop(): Unit = { - LOG.info("Registry Endpoint stopped") - } - -} - -case class RegisterWorkerRequest( - hostAddress: String, - port: Int, - cores: Int) - -case class RegisterWorkerResponse( - workerId: String) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/scala/org/apache/spark/search/Searcher.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala deleted file mode 100644 index 6fbea15..0000000 --- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.search - -import org.apache.spark.SerializableWritable -import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper -import org.apache.carbondata.core.metadata.schema.table.TableInfo -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit -import org.apache.carbondata.store.worker.SearchRequestHandler - -/** - * Search service implementation - */ -class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint { - private val LOG = LogServiceFactory.getLogService(this.getClass.getName) - - override def onStart(): Unit = { - LOG.info("Searcher Endpoint started") - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case req: SearchRequest => - val response = new SearchRequestHandler().handleSearch(req) - context.reply(response) - - case req: ShutdownRequest => - val response = new SearchRequestHandler().handleShutdown(req) - context.reply(response) - - } - - override def onStop(): Unit = { - LOG.info("Searcher Endpoint stopped") - } -} - -// Search request sent from master to worker -case class SearchRequest( - searchId: Int, - split: SerializableWritable[CarbonMultiBlockSplit], - tableInfo: TableInfo, - projectColumns: Array[String], - filterExpression: Expression, - limit: Long) - -// Search result sent from worker to master -case class SearchResult( - queryId: Int, - status: Int, - message: String, - rows: Array[Array[Object]]) - -// Shutdown request sent from master to worker -case class ShutdownRequest( - reason: String) - -// Shutdown response sent from worker to master -case class ShutdownResponse( - status: Int, - message: String) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java ---------------------------------------------------------------------- diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java deleted file mode 100644 index 88d925f..0000000 --- a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -public class SearchServiceTest { -// @Test -// public void testStartStopService() throws IOException, ExecutionException, InterruptedException { -// Master master = new Master(9999); -// master.startService(); -// -// Worker worker = Worker.getInstance(); -// worker.init(InetAddress.getLocalHost().getHostName(), 9999); -// -// Set<String> workers = master.getWorkers(); -// Assert.assertEquals(1, workers.size()); -// Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]); -// -// master.stopAllWorkers(); -// master.stopService(); -// } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala deleted file mode 100644 index 8780dc0..0000000 --- a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import scala.concurrent.Future -import scala.reflect.ClassTag - -import org.apache.spark.SparkConf -import org.scalatest.{BeforeAndAfterEach, FunSuite} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -class SchedulerSuite extends FunSuite with BeforeAndAfterEach { - - var scheduler: Scheduler = _ - var w1: Schedulable = _ - var w2: Schedulable = _ - var w3: Schedulable = _ - - override def beforeEach(): Unit = { - scheduler = new Scheduler() - w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef()) - w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef()) - w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef()) - - scheduler.addWorker("1.1.1.1", w1) - scheduler.addWorker("1.1.1.2", w2) - scheduler.addWorker("1.1.1.3", w3) - } - - test("test addWorker, removeWorker, getAllWorkers") { - assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) - - scheduler.removeWorker("1.1.1.2") - assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) - - val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef()) - scheduler.addWorker("1.1.1.4", w4) - assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet) - assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id) - } - - test("test normal schedule") { - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - assertResult(w1.id)(r1.id) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - assertResult(w2.id)(r2.id) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - assertResult(w3.id)(r3.id) - val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null) - assertResult(w1.id)(r4.id) - val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null) - assertResult(w2.id)(r5.id) - val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null) - assertResult(w3.id)(r6.id) - } - - test("test worker unavailable") { - val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null) - assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id)) - } - - test("test reschedule when target worker is overload") { - // by default, maxWorkload is number of core * 10, so it is 40 in this test suite - (1 to 40).foreach { i => - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null) - // it must be worker1 since worker3 exceed max workload - assertResult(w1.id)(r.id) - } - - test("test all workers are overload") { - // by default, maxWorkload is number of core * 10, so it is 40 in this test suite - (1 to 40).foreach { i => - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - - val e = intercept[WorkerTooBusyException] { - scheduler.sendRequestAsync("1.1.1.3", null) - } - } - - test("test user configured overload param") { - val original = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3") - - (1 to 3).foreach { i => - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - - val e = intercept[WorkerTooBusyException] { - scheduler.sendRequestAsync("1.1.1.3", null) - } - - if (original != null) { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original) - } - } - - test("test invalid property") { - intercept[IllegalArgumentException] { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3") - } - var value = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - assertResult(null)(value) - - intercept[NumberFormatException] { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s") - } - value = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - assertResult(null)(value) - } -} - -class DummyRef extends RpcEndpointRef(new SparkConf) { - override def address: RpcAddress = null - - override def name: String = "" - - override def send(message: Any): Unit = { } - - override def ask[T](message: Any, timeout: RpcTimeout) - (implicit evidence$1: ClassTag[T]): Future[T] = null -} \ No newline at end of file