[GitHub] [carbondata] kunal642 commented on a change in pull request #3281: [WIP]Index server performance improvement

2019-06-17 Thread GitBox
kunal642 commented on a change in pull request #3281: [WIP]Index server 
performance improvement
URL: https://github.com/apache/carbondata/pull/3281#discussion_r294256259
 
 

 ##
 File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
 ##
 @@ -1670,4 +1706,10 @@ public int getNumberOfHandlersForIndexServer() {
 return CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT;
   }
 
+  public int getNumOfThreadsForExecutorPruning() {
+return Integer.parseInt(CarbonProperties.getInstance()
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] kunal642 commented on a change in pull request #3281: [WIP]Index server performance improvement

2019-06-14 Thread GitBox
kunal642 commented on a change in pull request #3281: [WIP]Index server 
performance improvement
URL: https://github.com/apache/carbondata/pull/3281#discussion_r293681417
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
 ##
 @@ -19,98 +19,166 @@ package org.apache.carbondata.indexserver
 
 import java.text.SimpleDateFormat
 import java.util.Date
+import java.util.concurrent.Executors
 
 import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, 
Future}
+import scala.concurrent.duration.Duration
 
-import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
 import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.DistributableDataMapFormat
+import org.apache.carbondata.core.datamap.{DataMapDistributable, 
DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, 
ExtendedBlockletWrapper}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int,
+idx: Int,
+val inputSplit: Seq[InputSplit],
+location: Array[String])
   extends Partition {
 
   override def index: Int = idx
 
   override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+  def getLocations: Array[String] = {
+location
+  }
 }
 
 private[indexserver] class DistributedPruneRDD(@transient private val ss: 
SparkSession,
 dataMapFormat: DistributableDataMapFormat)
-  extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
+  extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) {
 
   @transient private val LOGGER = 
LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
 .getName)
-
   private val jobTrackerId: String = {
 val formatter = new SimpleDateFormat("MMddHHmm")
 formatter.format(new Date())
   }
-
-  override protected def getPreferredLocations(split: Partition): Seq[String] 
= {
-if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != 
null) {
-  split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
-} else {
-  Seq()
+  var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] 
= _
+
+  private def clearInvalidDataMaps(segmentNo: List[String]): Unit = {
+if (dataMapFormat.isJobToClearDataMaps) {
+  if (StringUtils.isNotEmpty(dataMapFormat.getDataMapToClear)) {
+val dataMaps = DataMapStoreManager.getInstance
+  .getAllDataMap(dataMapFormat.getCarbonTable).asScala.collect {
+  case dataMap if dataMapFormat.getDataMapToClear
+.equalsIgnoreCase(dataMap.getDataMapSchema.getDataMapName) =>
+segmentNo.foreach(segment => 
dataMap.deleteSegmentDatamapData(segment))
+dataMap.clear()
+Nil
+  case others => List(others)
+}.flatten
+DataMapStoreManager.getInstance.getAllDataMaps
+  .put(dataMapFormat.getCarbonTable.getTableUniqueName, 
dataMaps.asJava)
+  }
+  else {
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services