kunal642 commented on a change in pull request #3281: [WIP]Index server
performance improvement
URL: https://github.com/apache/carbondata/pull/3281#discussion_r293681417
##
File path:
integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
##
@@ -19,98 +19,166 @@ package org.apache.carbondata.indexserver
import java.text.SimpleDateFormat
import java.util.Date
+import java.util.concurrent.Executors
import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor,
Future}
+import scala.concurrent.duration.Duration
-import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.DistributableDataMapFormat
+import org.apache.carbondata.core.datamap.{DataMapDistributable,
DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet,
ExtendedBlockletWrapper}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
-class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int,
+idx: Int,
+val inputSplit: Seq[InputSplit],
+location: Array[String])
extends Partition {
override def index: Int = idx
override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+ def getLocations: Array[String] = {
+location
+ }
}
private[indexserver] class DistributedPruneRDD(@transient private val ss:
SparkSession,
dataMapFormat: DistributableDataMapFormat)
- extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
+ extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) {
@transient private val LOGGER =
LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
.getName)
-
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("MMddHHmm")
formatter.format(new Date())
}
-
- override protected def getPreferredLocations(split: Partition): Seq[String]
= {
-if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations !=
null) {
- split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
-} else {
- Seq()
+ var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]]
= _
+
+ private def clearInvalidDataMaps(segmentNo: List[String]): Unit = {
+if (dataMapFormat.isJobToClearDataMaps) {
+ if (StringUtils.isNotEmpty(dataMapFormat.getDataMapToClear)) {
+val dataMaps = DataMapStoreManager.getInstance
+ .getAllDataMap(dataMapFormat.getCarbonTable).asScala.collect {
+ case dataMap if dataMapFormat.getDataMapToClear
+.equalsIgnoreCase(dataMap.getDataMapSchema.getDataMapName) =>
+segmentNo.foreach(segment =>
dataMap.deleteSegmentDatamapData(segment))
+dataMap.clear()
+Nil
+ case others => List(others)
+}.flatten
+DataMapStoreManager.getInstance.getAllDataMaps
+ .put(dataMapFormat.getCarbonTable.getTableUniqueName,
dataMaps.asJava)
+ }
+ else {
Review comment:
fixed
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
With regards,
Apache Git Services