[REBASE] resolve conflict after rebasing to master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/880bbceb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/880bbceb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/880bbceb Branch: refs/heads/carbonstore-rebase5 Commit: 880bbcebf967d71f6932793114134acacfd26b3f Parents: 111bb5c Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Feb 27 08:51:25 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun Mar 4 20:04:48 2018 +0800 ---------------------------------------------------------------------- .../core/datamap/dev/AbstractDataMapWriter.java | 5 ++-- .../core/datamap/dev/DataMapFactory.java | 2 +- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../SegmentUpdateStatusManager.java | 9 +----- .../datamap/examples/MinMaxDataMapFactory.java | 5 ++-- .../datamap/examples/MinMaxDataWriter.java | 7 +++-- .../testsuite/datamap/CGDataMapTestCase.scala | 26 ++++++++-------- .../testsuite/datamap/DataMapWriterSuite.scala | 19 ++++++------ .../testsuite/datamap/FGDataMapTestCase.scala | 31 +++++++++----------- .../iud/DeleteCarbonTableTestCase.scala | 2 +- .../TestInsertAndOtherCommandConcurrent.scala | 14 +++++---- .../StandardPartitionTableCleanTestCase.scala | 12 ++++---- .../carbondata/spark/util/DataLoadingUtil.scala | 2 +- .../datamap/DataMapWriterListener.java | 2 +- .../processing/merger/CarbonDataMergerUtil.java | 8 +---- .../merger/CompactionResultSortProcessor.java | 4 +-- .../merger/RowResultMergerProcessor.java | 5 ++-- .../partition/spliter/RowResultProcessor.java | 5 ++-- .../util/CarbonDataProcessorUtil.java | 4 +-- .../processing/util/CarbonLoaderUtil.java | 9 ------ 20 files changed, 73 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java index bcc9bad..de6dcb1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev; import java.io.IOException; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -35,10 +36,10 @@ public abstract class AbstractDataMapWriter { protected String writeDirectoryPath; - public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId, + public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, Segment segment, String writeDirectoryPath) { this.identifier = identifier; - this.segmentId = segmentId; + this.segmentId = segment.getSegmentNo(); this.writeDirectoryPath = writeDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index df5670d..50ac279 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -39,7 +39,7 @@ public interface DataMapFactory<T extends DataMap> { /** * Return a new write for this datamap */ - AbstractDataMapWriter createWriter(Segment segment); + AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath); /** * Get the datamap for segmentid http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index efe2b71..ee849bd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -72,7 +72,7 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory } @Override - public AbstractDataMapWriter createWriter(Segment segment) { + public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) { throw new UnsupportedOperationException("not implemented"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 94a4243..39eb262 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -260,13 +260,8 @@ public class SegmentUpdateStatusManager { /** * Returns all delta file paths of specified block - * - * @param tupleId - * @param extension - * @return - * @throws Exception */ - public List<String> getDeltaFiles(String tupleId, String extension) throws Exception { + private List<String> getDeltaFiles(String tupleId, String extension) throws Exception { try { String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); String completeBlockName = CarbonTablePath.addDataPartPrefix( @@ -405,10 +400,8 @@ public class SegmentUpdateStatusManager { public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) { String segmentPath = CarbonTablePath.getSegmentPath( identifier.getTablePath(), segmentId.getSegmentNo()); - CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); - for (SegmentUpdateDetails block : updateDetails) { if ((block.getBlockName().equalsIgnoreCase(blockName)) && (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo())) http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java index 266c107..4ef74a7 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java @@ -52,9 +52,8 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory { * @param segment * @return */ - @Override public AbstractDataMapWriter createWriter(Segment segment) { - return new MinMaxDataWriter(identifier, segment.getSegmentNo(), - CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())); + @Override public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) { + return new MinMaxDataWriter(identifier, segment, writeDirectoryPath); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index fe0bbcf..5046182 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -52,11 +53,11 @@ public class MinMaxDataWriter extends AbstractDataMapWriter { private String dataWritePath; - public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId, + public MinMaxDataWriter(AbsoluteTableIdentifier identifier, Segment segment, String dataWritePath) { - super(identifier, segmentId, dataWritePath); + super(identifier, segment, dataWritePath); this.identifier = identifier; - this.segmentId = segmentId; + this.segmentId = segment.getSegmentNo(); this.dataWritePath = dataWritePath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 4b6f231..1cbbcb4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.FileReader import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.compression.SnappyCompressor import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.Blocklet +import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.scan.expression.Expression @@ -62,16 +62,16 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { /** * Return a new write for this datamap */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = { + new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName) } /** * Get the datamap for segmentid */ - override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = { + override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = { val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -108,9 +108,9 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { * * @return */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { + override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -125,7 +125,7 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { /** * Clears datamap of the segment */ - override def clear(segmentId: String): Unit = { + override def clear(segment: Segment): Unit = { } @@ -175,7 +175,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap { override def prune( filterExp: FilterResolverIntf, segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[Blocklet] = { + partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = { val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() val expression = filterExp.getFilterExpression getEqualToExpression(expression, buffer) @@ -184,7 +184,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap { } val meta = findMeta(value(0).getBytes) meta.map { f=> - new Blocklet(f._1, f._2+"") + new Blocklet(f._1, f._2 + "") }.asJava } @@ -219,10 +219,10 @@ class CGDataMap extends AbstractCoarseGrainDataMap { } class CGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, + segment: Segment, dataWritePath: String, dataMapName: String) - extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + extends AbstractDataMapWriter(identifier, segment, dataWritePath) { var currentBlockId: String = null val cgwritepath = dataWritePath + "/" + http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index 2f8a1d1..7e93959 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -20,21 +20,19 @@ package org.apache.carbondata.spark.testsuite.datamap import java.util import scala.collection.JavaConverters._ + import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -49,15 +47,16 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory { override def fireEvent(event: Event): Unit = ??? - override def clear(segmentId: Segment): Unit = {} + override def clear(segment: Segment): Unit = {} override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ??? - override def getDataMaps(segmentId: Segment): util.List[DataMap] = ??? + override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ??? - override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = + DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath) override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) @@ -175,9 +174,9 @@ object DataMapWriterSuite { var callbackSeq: Seq[String] = Seq[String]() - def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, + def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment, dataWritePath: String) = - new AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + new AbstractDataMapWriter(identifier, segment, dataWritePath) { override def onPageAdded( blockletId: Int, http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index d1bb65f..9c8cc15 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory} import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.FileReader import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.compression.SnappyCompressor import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.FineGrainBlocklet +import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.scan.expression.Expression @@ -62,16 +62,16 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { /** * Return a new write for this datamap */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = { + new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName) } /** * Get the datamap for segmentid */ - override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = { + override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = { val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -99,9 +99,9 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { * * @return */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { + val file = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -112,7 +112,6 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { }.toList.asJava } - /** * * @param event @@ -124,7 +123,7 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { /** * Clears datamap of the segment */ - override def clear(segmentId: String): Unit = { + override def clear(segment: Segment): Unit = { } /** @@ -173,7 +172,7 @@ class FGDataMap extends AbstractFineGrainDataMap { override def prune( filterExp: FilterResolverIntf, segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { + partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = { val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() val expression = filterExp.getFilterExpression getEqualToExpression(expression, buffer) @@ -187,7 +186,7 @@ class FGDataMap extends AbstractFineGrainDataMap { } private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), - value: Array[Byte]): Option[FineGrainBlocklet] = { + value: Array[Byte]): Option[Blocklet] = { val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) val obj = new ObjectInputStream(outputStream) @@ -211,12 +210,10 @@ class FGDataMap extends AbstractFineGrainDataMap { pg.setRowId(f._2(p._2).toArray) pg } - pages Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) } else { None } - } private def findMeta(value: Array[Byte]) = { @@ -249,8 +246,8 @@ class FGDataMap extends AbstractFineGrainDataMap { } class FGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, dataWriterPath: String, dataMapName: String) - extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { + segment: Segment, dataWriterPath: String, dataMapName: String) + extends AbstractDataMapWriter(identifier, segment, dataWriterPath) { var currentBlockId: String = null val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap" http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index d05f022..510903a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -194,7 +194,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("delete from update_status_files where age=5").show() val carbonTable = CarbonEnv .getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession) - val metaPath = carbonTable.getMetaDataFilepath + val metaPath = carbonTable.getMetadataPath val files = FileFactory.getCarbonFile(metaPath) val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.getClass if(result.getCanonicalName.contains("CarbonFileMetastore")) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 5550358..b39c44c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -269,7 +269,11 @@ object Global { class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { - override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { } + private var identifier: AbsoluteTableIdentifier = _ + + override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { + this.identifier = identifier + } override def fireEvent(event: Event): Unit = ??? @@ -277,12 +281,12 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ??? - override def getDataMaps(segmentId: Segment): util.List[DataMap] = ??? + override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ??? - override def createWriter(segmentId: Segment): AbstractDataMapWriter = { - new AbstractDataMapWriter { + override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = { + new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) { override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { } override def onBlockletEnd(blockletId: Int): Unit = { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala index f238d2b..cfc6983 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala @@ -52,14 +52,12 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val partitions = CarbonFilters - .getPartitions(Seq.empty, - sqlContext.sparkSession, - TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) + val partitions = CarbonFilters.getPartitions( + Seq.empty, + sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) assert(partitions.get.length == partition) - val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath) + val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath)) val segLoad = details.find(_.getLoadName.equals(segmentId)).get val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile) assert(seg.getIndexFiles.size == indexes) http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index cee40c8..49e4420 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -441,7 +441,7 @@ object DataLoadingUtil { private def isUpdationRequired(isForceDeletion: Boolean, carbonTable: CarbonTable, - absoluteTableIdentifier: AbsoluteTableIdentifier) = { + absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = { val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) // Delete marked loads val isUpdationRequired = http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 5083ab5..1104229 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -74,7 +74,7 @@ public class DataMapWriterListener { } List<String> columns = factory.getMeta().getIndexedColumns(); List<AbstractDataMapWriter> writers = registry.get(columns); - AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null)); + AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath); if (writers != null) { writers.add(writer); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 4579c85..1ab803b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -1005,14 +1005,8 @@ public final class CarbonDataMergerUtil { /** * This method traverses Update Delta Files inside the seg and return true * if UpdateDelta Files are more than IUD Compaction threshold. - * - * @param seg - * @param identifier - * @param segmentUpdateStatusManager - * @param numberDeltaFilesThreshold - * @return */ - public static Boolean checkUpdateDeltaFilesInSeg(Segment seg, + private static Boolean checkUpdateDeltaFilesInSeg(Segment seg, AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index bce1b33..7435d73 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -406,8 +406,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { + carbonLoadModel.getFactTimeStamp() + ".tmp"; } else { carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(), - tableName, carbonLoadModel.getSegmentId()); + .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, + carbonLoadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 4aca13a..2616def 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -76,9 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel .getFactTimeStamp() + ".tmp"; } else { - carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getSegmentId()); + carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 92db4c5..221697f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -47,9 +47,8 @@ public class RowResultProcessor { CarbonDataProcessorUtil.createLocations(tempStoreLocation); this.segmentProperties = segProp; String tableName = carbonTable.getTableName(); - String carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getSegmentId()); + String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 64e50b0..efd715c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -393,8 +392,7 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String createCarbonStoreLocation(String factStoreLocation, - String databaseName, String tableName, String segmentId) { + public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/880bbceb/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 1f93ba1..b7aadd0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -459,15 +459,6 @@ public final class CarbonLoaderUtil { } - public static String readCurrentTime() { - SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); - String date = null; - - date = sdf.format(new Date()); - - return date; - } - public static boolean isValidEscapeSequence(String escapeChar) { return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) || escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||