http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index 3b5b5ca..4985718 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -294,17 +294,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with val expectedAnswer1 = sql(s"select * from $normalTable where id = 1").collect() val expectedAnswer2 = sql(s"select * from $normalTable where city in ('city_999')").collect() - carbonSession.startSearchMode() - assert(carbonSession.isSearchModeEnabled) - - checkAnswer( - sql(s"select * from $bloomDMSampleTable where id = 1"), expectedAnswer1) - checkAnswer( - sql(s"select * from $bloomDMSampleTable where city in ('city_999')"), expectedAnswer2) - - carbonSession.stopSearchMode() - assert(!carbonSession.isSearchModeEnabled) - sql(s"DROP TABLE IF EXISTS $normalTable") sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } @@ -975,10 +964,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with } override protected def afterAll(): Unit = { - // in case of search mode test case failed, stop search mode again - if (carbonSession.isSearchModeEnabled) { - carbonSession.stopSearchMode() - } deleteFile(bigFile) deleteFile(smallFile) sql(s"DROP TABLE IF EXISTS $normalTable")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 709f346..a37bf30 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,6 @@ <module>integration/spark-common-test</module> <module>datamap/examples</module> <module>store/sdk</module> - <module>store/search</module> <module>assembly</module> <module>tools/cli</module> </modules> @@ -536,8 +535,6 @@ <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory> - <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory> - <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory> </sourceDirectories> @@ -599,8 +596,6 @@ <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory> - <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory> - <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory> </sourceDirectories> @@ -658,8 +653,6 @@ <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/streaming/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/store/sdk/src/main/java</sourceDirectory> - <sourceDirectory>${basedir}/store/search/src/main/scala</sourceDirectory> - <sourceDirectory>${basedir}/store/search/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/datamap/bloom/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/datamap/lucene/src/main/java</sourceDirectory> </sourceDirectories> http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/pom.xml ---------------------------------------------------------------------- diff --git a/store/search/pom.xml b/store/search/pom.xml deleted file mode 100644 index 2e2628a..0000000 --- a/store/search/pom.xml +++ /dev/null @@ -1,110 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-parent</artifactId> - <version>1.5.1-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>carbondata-search</artifactId> - <name>Apache CarbonData :: Search </name> - - <properties> - <dev.path>${basedir}/../../dev</dev.path> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-hadoop</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <testSourceDirectory>src/test/scala</testSourceDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>testCompile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> - <!-- Note config is repeated in surefire config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>CarbonTestSuite.txt</filereports> - <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m - </argLine> - <stderr /> - <environmentVariables> - </environmentVariables> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> - </systemProperties> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java deleted file mode 100644 index 6492a9b..0000000 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.worker; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.DataMapChooser; -import org.apache.carbondata.core.datamap.DataMapDistributable; -import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; -import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; -import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.model.QueryModelBuilder; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonTaskInfo; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.ThreadLocalTaskInfo; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonRecordReader; -import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.apache.spark.search.SearchRequest; -import org.apache.spark.search.SearchResult; -import org.apache.spark.search.ShutdownRequest; -import org.apache.spark.search.ShutdownResponse; - -/** - * Thread runnable for handling SearchRequest from master. - */ -@InterfaceAudience.Internal -public class SearchRequestHandler { - - private static final Logger LOG = - LogServiceFactory.getLogService(SearchRequestHandler.class.getName()); - - public SearchResult handleSearch(SearchRequest request) { - try { - LOG.info(String.format("[SearchId:%d] receive search request", request.searchId())); - List<CarbonRow> rows = handleRequest(request); - LOG.info(String.format("[SearchId:%d] sending success response", request.searchId())); - return createSuccessResponse(request, rows); - } catch (IOException e) { - LOG.error(e); - LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId())); - return createFailureResponse(request, e); - } - } - - public ShutdownResponse handleShutdown(ShutdownRequest request) { - LOG.info("Shutting down worker..."); - SearchModeDetailQueryExecutor.shutdownThreadPool(); - SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); - LOG.info("Worker shutted down"); - return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); - } - - private DataMapExprWrapper chooseFGDataMap( - CarbonTable table, - FilterResolverIntf filterInterface) throws IOException { - DataMapChooser chooser = new DataMapChooser(table); - return chooser.chooseFGDataMap(filterInterface); - } - - /** - * Builds {@link QueryModel} and read data from files - */ - private List<CarbonRow> handleRequest(SearchRequest request) - throws IOException { - CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo(); - carbonTaskInfo.setTaskId(CarbonUtil.generateUUID()); - ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo); - TableInfo tableInfo = request.tableInfo(); - CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); - QueryModel queryModel = createQueryModel(table, request); - - // in search mode, plain reader is better since it requires less memory - queryModel.setVectorReader(false); - - CarbonMultiBlockSplit mbSplit = request.split().value(); - List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits()); - queryModel.setTableBlockInfos(list); - long limit = request.limit(); - long rowCount = 0; - - LOG.info(String.format("[SearchId:%d] %s, number of block: %d", - request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size())); - DataMapExprWrapper fgDataMap = chooseFGDataMap(table, - queryModel.getFilterExpressionResolverTree()); - - // If there is DataMap selected in Master, prune the split by it - if (fgDataMap != null) { - queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap); - } - - // In search mode, reader will read multiple blocks by using a thread pool - CarbonRecordReader<CarbonRow> reader = - new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport(), new Configuration()); - - // read all rows by the reader - List<CarbonRow> rows = new LinkedList<>(); - try { - reader.initialize(mbSplit, null); - - // loop to read required number of rows. - // By default, if user does not specify the limit value, limit is Long.MaxValue - while (reader.nextKeyValue() && rowCount < limit) { - rows.add(reader.getCurrentValue()); - rowCount++; - } - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - reader.close(); - } - LOG.info(String.format("[SearchId:%d] scan completed, return %d rows", - request.searchId(), rows.size())); - return rows; - } - - /** - * If there is FGDataMap defined for this table and filter condition in the query, - * prune the splits by the DataMap and set the pruned split into the QueryModel and return - */ - private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel, - CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { - Objects.requireNonNull(datamap); - List<Segment> segments = new LinkedList<>(); - HashMap<String, Integer> uniqueSegments = new HashMap<>(); - LoadMetadataDetails[] loadMetadataDetails = - SegmentStatusManager.readLoadMetadata( - CarbonTablePath.getMetadataPath(table.getTablePath())); - for (CarbonInputSplit split : mbSplit.getAllSplits()) { - String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString(); - if (uniqueSegments.get(segmentId) == null) { - segments.add(Segment.toSegment(segmentId, - new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), - loadMetadataDetails, FileFactory.getConfiguration()))); - uniqueSegments.put(segmentId, 1); - } else { - uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); - } - } - - List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); - List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); - for (int i = 0; i < distributables.size(); i++) { - DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); - prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); - } - - HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); - for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { - pathToRead.put(prunedBlocklet.getFilePath().replace('\\', '/'), prunedBlocklet); - } - - List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); - List<TableBlockInfo> blockToRead = new LinkedList<>(); - for (TableBlockInfo block : blocks) { - if (pathToRead.keySet().contains(block.getFilePath())) { - // If not set this, it will can't create FineGrainBlocklet object in - // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData - block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); - blockToRead.add(block); - } - } - LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId, - blockToRead.size())); - queryModel.setTableBlockInfos(blockToRead); - queryModel.setFG(true); - return queryModel; - } - - private QueryModel createQueryModel(CarbonTable table, SearchRequest request) { - String[] projectColumns = request.projectColumns(); - Expression filter = null; - if (request.filterExpression() != null) { - filter = request.filterExpression(); - } - return new QueryModelBuilder(table) - .projectColumns(projectColumns) - .filterExpression(filter) - .build(); - } - - /** - * create a failure response - */ - private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) { - return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(), - new Object[0][]); - } - - /** - * create a success response with result rows - */ - private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) { - Iterator<CarbonRow> itor = rows.iterator(); - Object[][] output = new Object[rows.size()][]; - int i = 0; - while (itor.hasNext()) { - output[i++] = itor.next().getData(); - } - return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java deleted file mode 100644 index 71df3e0..0000000 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.worker; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -/** - * Status of RPC response - */ -@InterfaceAudience.Internal -public enum Status { - SUCCESS, FAILURE -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala deleted file mode 100644 index 97951ea..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import java.io.IOException -import java.net.{BindException, InetAddress} -import java.util.{List => JList, Map => JMap, Objects, Random, UUID} -import java.util.concurrent.atomic.AtomicBoolean - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf} -import org.apache.spark.rpc.netty.NettyRpcEnvFactory -import org.apache.spark.search._ -import org.apache.spark.util.ThreadUtils - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.Distributable -import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit -import org.apache.carbondata.hadoop.api.CarbonInputFormat -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil -import org.apache.carbondata.processing.util.CarbonLoaderUtil -import org.apache.carbondata.store.worker.Status - -/** - * Master of CarbonSearch. - * It provides a Registry service for worker to register. - * And it provides search API to fire RPC call to workers. - */ -@InterfaceAudience.Internal -class Master(sparkConf: SparkConf) { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - // worker host address map to EndpointRef - - private val random = new Random - - private var rpcEnv: RpcEnv = _ - - private val scheduler: Scheduler = new Scheduler - - /** start service and listen on port passed in constructor */ - def startService(): Unit = { - if (rpcEnv == null) { - LOG.info("Start search mode master thread") - val isStarted: AtomicBoolean = new AtomicBoolean(false) - new Thread(new Runnable { - override def run(): Unit = { - val hostAddress = InetAddress.getLocalHost.getHostAddress - var port = CarbonProperties.getSearchMasterPort - var exception: BindException = null - var numTry = 100 // we will try to create service at worse case 100 times - do { - try { - LOG.info(s"starting registry-service on $hostAddress:$port") - val config = RpcUtil.getRpcEnvConfig( - sparkConf, "registry-service", hostAddress, "", port, - new SecurityManager(sparkConf), clientMode = false) - rpcEnv = new NettyRpcEnvFactory().create(config) - numTry = 0 - } catch { - case e: BindException => - // port is occupied, increase the port number and try again - exception = e - LOG.error(s"start registry-service failed: ${e.getMessage}") - port = port + 1 - numTry = numTry - 1 - } - } while (numTry > 0) - if (rpcEnv == null) { - // we have tried many times, but still failed to find an available port - throw exception - } - val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this) - rpcEnv.setupEndpoint("registry-service", registryEndpoint) - if (isStarted.compareAndSet(false, false)) { - synchronized { - isStarted.compareAndSet(false, true) - } - } - LOG.info("registry-service started") - rpcEnv.awaitTermination() - } - }).start() - var count = 0 - val countThreshold = 5000 - while (isStarted.compareAndSet(false, false) && count < countThreshold) { - LOG.info(s"Waiting search mode master to start, retrying $count times") - Thread.sleep(10) - count = count + 1; - } - if (count >= countThreshold) { - LOG.error(s"Search mode try $countThreshold times to start master but failed") - throw new RuntimeException( - s"Search mode try $countThreshold times to start master but failed") - } else { - LOG.info("Search mode master started") - } - } else { - LOG.info("Search mode master has already started") - } - } - - def stopService(): Unit = { - if (rpcEnv != null) { - rpcEnv.shutdown() - rpcEnv = null - } - } - - def stopAllWorkers(): Unit = { - val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) => - (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user"))) - } - futures.foreach { case (address, future) => - ThreadUtils.awaitResult(future, Duration.apply("10s")) - future.value match { - case Some(result) => - result match { - case Success(response) => scheduler.removeWorker(address) - case Failure(throwable) => throw new IOException(throwable.getMessage) - } - case None => throw new ExecutionTimeoutException - } - } - } - - /** A new searcher is trying to register, add it to the map and connect to this searcher */ - def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = { - LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " + - s"with ${request.cores} cores") - val workerId = UUID.randomUUID().toString - val workerAddress = request.hostAddress - val workerPort = request.port - LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId") - - val endPointRef = - rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service") - scheduler.addWorker(workerAddress, - new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef)) - LOG.info(s"worker ${request.hostAddress}:${request.port} registered") - RegisterWorkerResponse(workerId) - } - - /** - * Execute search by firing RPC call to worker, return the result rows - * @param table table to search - * @param columns projection column names - * @param filter filter expression - * @param globalLimit max number of rows required in Master - * @param localLimit max number of rows required in Worker - * @return - */ - def search(table: CarbonTable, columns: Array[String], filter: Expression, - globalLimit: Long, localLimit: Long): Array[CarbonRow] = { - Objects.requireNonNull(table) - Objects.requireNonNull(columns) - if (globalLimit < 0 || localLimit < 0) { - throw new IllegalArgumentException("limit should be positive") - } - - val queryId = random.nextInt - var rowCount = 0 - val output = new ArrayBuffer[CarbonRow] - - def onSuccess(result: SearchResult): Unit = { - // in case of RPC success, collect all rows in response message - if (result.queryId != queryId) { - throw new IOException( - s"queryId in response does not match request: ${result.queryId} != $queryId") - } - if (result.status != Status.SUCCESS.ordinal()) { - throw new IOException(s"failure in worker: ${ result.message }") - } - - val itor = result.rows.iterator - while (itor.hasNext && rowCount < globalLimit) { - output += new CarbonRow(itor.next()) - rowCount = rowCount + 1 - } - LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount") - } - def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }") - def onTimedout() = throw new ExecutionTimeoutException() - - // prune data and get a mapping of worker hostname to list of blocks, - // then add these blocks to the SearchRequest and fire the RPC call - val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter) - val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) => - // Build a SearchRequest - val split = new SerializableWritable[CarbonMultiBlockSplit]( - new CarbonMultiBlockSplit(blocks, splitAddress)) - val request = - SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit) - - // Find an Endpoind and send the request to it - // This RPC is non-blocking so that we do not need to wait before send to next worker - scheduler.sendRequestAsync[SearchResult](splitAddress, request) - } - - // loop to get the result of each Worker - tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) => - - // if we have enough data already, we do not need to collect more result - if (rowCount < globalLimit) { - // wait for worker - val timeout = CarbonProperties - .getInstance() - .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, - CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT) - ThreadUtils.awaitResult(future, Duration.apply(timeout)) - LOG.info(s"[SearchId:$queryId] receive search response from worker " + - s"${worker.address}:${worker.port}") - try { - future.value match { - case Some(response: Try[SearchResult]) => - response match { - case Success(result) => onSuccess(result) - case Failure(e) => onFaiure(e) - } - case None => onTimedout() - } - } finally { - worker.workload.decrementAndGet() - } - } - } - output.toArray - } - - /** - * Prune data by using CarbonInputFormat.getSplit - * Return a mapping of host address to list of block - */ - private def pruneBlock( - table: CarbonTable, - columns: Array[String], - filter: Expression): JMap[String, JList[Distributable]] = { - val jobConf = new JobConf(new Configuration) - val job = new Job(jobConf) - val format = CarbonInputFormatUtil.createCarbonTableInputFormat( - job, table, columns, filter, null, null) - - // We will do FG pruning in reader side, so don't do it here - CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false) - val splits = format.getSplits(job) - val distributables = splits.asScala.map { split => - split.asInstanceOf[Distributable] - } - CarbonLoaderUtil.nodeBlockMapping( - distributables.asJava, - -1, - getWorkers.asJava, - CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, - null) - } - - /** return hostname of all workers */ - def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq -} - -// Exception if execution timed out in search mode -class ExecutionTimeoutException extends RuntimeException http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala b/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala deleted file mode 100644 index f15bb8f..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/RpcUtil.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf} -import org.apache.spark.util.Utils - -object RpcUtil { - - def getRpcEnvConfig(conf: SparkConf, - name: String, - bindAddress: String, - advertiseAddress: String, - port: Int, - securityManager: SecurityManager, - clientMode: Boolean): RpcEnvConfig = { - val className = "org.apache.spark.rpc.RpcEnvConfig" - if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) { - createObject(className, conf, name, bindAddress, - advertiseAddress, port.asInstanceOf[Object], - securityManager, clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig] - } else if (SPARK_VERSION.startsWith("2.3")) { - // numUsableCores if it is 0 then spark will consider the available CPUs on the host. - val numUsableCores: Int = 0 - createObject(className, conf, name, bindAddress, - advertiseAddress, port.asInstanceOf[Object], - securityManager, numUsableCores.asInstanceOf[Object], - clientMode.asInstanceOf[Object])._1.asInstanceOf[RpcEnvConfig] - } else { - throw new UnsupportedOperationException("Spark version not supported") - } - } - - def createObject(className: String, conArgs: Object*): (Any, Class[_]) = { - val clazz = Utils.classForName(className) - val ctor = clazz.getConstructors.head - ctor.setAccessible(true) - (ctor.newInstance(conArgs: _*), clazz) - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala deleted file mode 100644 index 26208d0..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import java.io.IOException -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.concurrent.Future -import scala.reflect.ClassTag -import scala.util.Random - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties - -/** - * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request - */ -private[rpc] class Scheduler { - // mapping of worker IP address to worker instance - private val workers = mutable.Map[String, Schedulable]() - private val random = new Random() - - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * Pick a Worker according to the address and workload of the Worker - * Invoke the RPC and return Future result - */ - def sendRequestAsync[T: ClassTag]( - splitAddress: String, - request: Any): (Schedulable, Future[T]) = { - require(splitAddress != null) - if (workers.isEmpty) { - throw new IOException("No worker is available") - } - var worker = pickWorker(splitAddress) - - // check whether worker exceed max workload, if exceeded, pick next worker - val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores) - var numTry = workers.size - do { - if (worker.workload.get() >= maxWorkload) { - LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...") - worker = pickNextWorker(worker) - numTry = numTry - 1 - } else { - numTry = -1 - } - } while (numTry > 0) - if (numTry == 0) { - // tried so many times and still not able to find Worker - throw new WorkerTooBusyException( - s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload") - } - LOG.info(s"sending search request to worker ${worker.address}:${worker.port}") - val future = worker.ref.ask(request) - worker.workload.incrementAndGet() - (worker, future) - } - - private def pickWorker[T: ClassTag](splitAddress: String) = { - try { - workers(splitAddress) - } catch { - case e: NoSuchElementException => - // no local worker available, choose one worker randomly - pickRandomWorker() - } - } - - /** pick a worker randomly */ - private def pickRandomWorker() = { - val index = random.nextInt(workers.size) - workers.toSeq(index)._2 - } - - /** pick the next worker of the input worker in the [[Scheduler.workers]] */ - private def pickNextWorker(worker: Schedulable) = { - val index = workers.zipWithIndex.find { case ((address, w), index) => - w == worker - }.get._2 - if (index == workers.size - 1) { - workers.toSeq.head._2 - } else { - workers.toSeq(index + 1)._2 - } - } - - /** A new searcher is trying to register, add it to the map and connect to this searcher */ - def addWorker(address: String, schedulable: Schedulable): Unit = { - require(schedulable != null) - require(address.equals(schedulable.address)) - workers(address) = schedulable - } - - def removeWorker(address: String): Unit = { - workers.remove(address) - } - - def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator -} - -/** - * Represent a Worker which [[Scheduler]] can send - * Search request on it - * @param id Worker ID, a UUID string - * @param cores, number of cores in Worker - * @param ref RPC endpoint reference - * @param workload number of outstanding request sent to Worker - */ -private[rpc] class Schedulable( - val id: String, - val address: String, - val port: Int, - val cores: Int, - val ref: RpcEndpointRef, - var workload: AtomicInteger) { - def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = { - this(id, address, port, cores, ref, new AtomicInteger()) - } -} - -class WorkerTooBusyException(message: String) extends RuntimeException(message) http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala deleted file mode 100644 index 08baeeb..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import java.io.IOException -import java.net.{BindException, InetAddress} - -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success} - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.rpc.netty.NettyRpcEnvFactory -import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher} -import org.apache.spark.util.ThreadUtils - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties - -@InterfaceAudience.Internal -object Worker { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - private val hostAddress = InetAddress.getLocalHost.getHostAddress - private var port: Int = _ - - def init(masterHostAddress: String, masterPort: Int): Unit = { - LOG.info(s"initializing worker...") - startService() - LOG.info(s"registering to master $masterHostAddress:$masterPort") - val workerId = registerToMaster(masterHostAddress, masterPort) - LOG.info(s"worker registered to master, workerId: $workerId") - } - - /** - * Start to listen on port [[CarbonProperties.getSearchWorkerPort]] - */ - private def startService(): Unit = { - new Thread(new Runnable { - override def run(): Unit = { - port = CarbonProperties.getSearchWorkerPort - val conf = new SparkConf() - var rpcEnv: RpcEnv = null - var exception: BindException = null - var numTry = 100 // we will try to create service at worse case 100 times - do { - try { - LOG.info(s"starting search-service on $hostAddress:$port") - val config = RpcUtil.getRpcEnvConfig( - conf, s"worker-$hostAddress", hostAddress, "", port, - new SecurityManager(conf), clientMode = false) - rpcEnv = new NettyRpcEnvFactory().create(config) - numTry = 0 - } catch { - case e: BindException => - // port is occupied, increase the port number and try again - exception = e - LOG.error(s"start search-service failed: ${e.getMessage}") - port = port + 1 - numTry = numTry - 1 - } - } while (numTry > 0) - if (rpcEnv == null) { - // we have tried many times, but still failed to find an available port - throw exception - } - val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv) - rpcEnv.setupEndpoint("search-service", searchEndpoint) - LOG.info("search-service started") - rpcEnv.awaitTermination() - } - }).start() - } - - private def registerToMaster(masterHostAddress: String, masterPort: Int): String = { - LOG.info(s"trying to register to master $masterHostAddress:$masterPort") - val conf = new SparkConf() - val config = RpcUtil.getRpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort, - new SecurityManager(conf), clientMode = true) - val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config) - - val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef( - RpcAddress(masterHostAddress, masterPort), "registry-service") - val cores = Runtime.getRuntime.availableProcessors() - - val request = RegisterWorkerRequest(hostAddress, port, cores) - val future = endPointRef.ask[RegisterWorkerResponse](request) - ThreadUtils.awaitResult(future, Duration.apply("10s")) - future.value match { - case Some(result) => - result match { - case Success(response) => - LOG.info("worker registered") - response.workerId - case Failure(throwable) => - LOG.error(s"worker failed to registered: $throwable") - throw new IOException(throwable.getMessage) - } - case None => - LOG.error("worker register timeout") - throw new ExecutionTimeoutException - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/search/Registry.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala deleted file mode 100644 index 22e766d..0000000 --- a/store/search/src/main/scala/org/apache/spark/search/Registry.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.search - -import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} - -import org.apache.carbondata.common.logging.LogServiceFactory - -/** - * Registry service implementation. It adds worker to master. - */ -class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - override def onStart(): Unit = { - LOG.info("Registry Endpoint started") - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case req@RegisterWorkerRequest(_, _, _) => - val response = master.addWorker(req) - context.reply(response) - } - - override def onStop(): Unit = { - LOG.info("Registry Endpoint stopped") - } - -} - -case class RegisterWorkerRequest( - hostAddress: String, - port: Int, - cores: Int) - -case class RegisterWorkerResponse( - workerId: String) http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/main/scala/org/apache/spark/search/Searcher.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala deleted file mode 100644 index 6fbea15..0000000 --- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.search - -import org.apache.spark.SerializableWritable -import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper -import org.apache.carbondata.core.metadata.schema.table.TableInfo -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit -import org.apache.carbondata.store.worker.SearchRequestHandler - -/** - * Search service implementation - */ -class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint { - private val LOG = LogServiceFactory.getLogService(this.getClass.getName) - - override def onStart(): Unit = { - LOG.info("Searcher Endpoint started") - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case req: SearchRequest => - val response = new SearchRequestHandler().handleSearch(req) - context.reply(response) - - case req: ShutdownRequest => - val response = new SearchRequestHandler().handleShutdown(req) - context.reply(response) - - } - - override def onStop(): Unit = { - LOG.info("Searcher Endpoint stopped") - } -} - -// Search request sent from master to worker -case class SearchRequest( - searchId: Int, - split: SerializableWritable[CarbonMultiBlockSplit], - tableInfo: TableInfo, - projectColumns: Array[String], - filterExpression: Expression, - limit: Long) - -// Search result sent from worker to master -case class SearchResult( - queryId: Int, - status: Int, - message: String, - rows: Array[Array[Object]]) - -// Shutdown request sent from master to worker -case class ShutdownRequest( - reason: String) - -// Shutdown response sent from worker to master -case class ShutdownResponse( - status: Int, - message: String) http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java ---------------------------------------------------------------------- diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java deleted file mode 100644 index 88d925f..0000000 --- a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -public class SearchServiceTest { -// @Test -// public void testStartStopService() throws IOException, ExecutionException, InterruptedException { -// Master master = new Master(9999); -// master.startService(); -// -// Worker worker = Worker.getInstance(); -// worker.init(InetAddress.getLocalHost().getHostName(), 9999); -// -// Set<String> workers = master.getWorkers(); -// Assert.assertEquals(1, workers.size()); -// Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]); -// -// master.stopAllWorkers(); -// master.stopService(); -// } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/311c78bd/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala deleted file mode 100644 index 8780dc0..0000000 --- a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import scala.concurrent.Future -import scala.reflect.ClassTag - -import org.apache.spark.SparkConf -import org.scalatest.{BeforeAndAfterEach, FunSuite} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -class SchedulerSuite extends FunSuite with BeforeAndAfterEach { - - var scheduler: Scheduler = _ - var w1: Schedulable = _ - var w2: Schedulable = _ - var w3: Schedulable = _ - - override def beforeEach(): Unit = { - scheduler = new Scheduler() - w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef()) - w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef()) - w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef()) - - scheduler.addWorker("1.1.1.1", w1) - scheduler.addWorker("1.1.1.2", w2) - scheduler.addWorker("1.1.1.3", w3) - } - - test("test addWorker, removeWorker, getAllWorkers") { - assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) - - scheduler.removeWorker("1.1.1.2") - assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) - - val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef()) - scheduler.addWorker("1.1.1.4", w4) - assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet) - assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id) - } - - test("test normal schedule") { - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - assertResult(w1.id)(r1.id) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - assertResult(w2.id)(r2.id) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - assertResult(w3.id)(r3.id) - val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null) - assertResult(w1.id)(r4.id) - val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null) - assertResult(w2.id)(r5.id) - val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null) - assertResult(w3.id)(r6.id) - } - - test("test worker unavailable") { - val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null) - assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id)) - } - - test("test reschedule when target worker is overload") { - // by default, maxWorkload is number of core * 10, so it is 40 in this test suite - (1 to 40).foreach { i => - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null) - // it must be worker1 since worker3 exceed max workload - assertResult(w1.id)(r.id) - } - - test("test all workers are overload") { - // by default, maxWorkload is number of core * 10, so it is 40 in this test suite - (1 to 40).foreach { i => - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - - val e = intercept[WorkerTooBusyException] { - scheduler.sendRequestAsync("1.1.1.3", null) - } - } - - test("test user configured overload param") { - val original = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3") - - (1 to 3).foreach { i => - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - - val e = intercept[WorkerTooBusyException] { - scheduler.sendRequestAsync("1.1.1.3", null) - } - - if (original != null) { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original) - } - } - - test("test invalid property") { - intercept[IllegalArgumentException] { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3") - } - var value = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - assertResult(null)(value) - - intercept[NumberFormatException] { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s") - } - value = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - assertResult(null)(value) - } -} - -class DummyRef extends RpcEndpointRef(new SparkConf) { - override def address: RpcAddress = null - - override def name: String = "" - - override def send(message: Any): Unit = { } - - override def ask[T](message: Any, timeout: RpcTimeout) - (implicit evidence$1: ClassTag[T]): Future[T] = null -} \ No newline at end of file