[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2021-03-01 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r584683576



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -440,6 +393,73 @@ public boolean accept(CarbonFile file) {
 return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex 
file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not 
deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds 
mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers 
the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 
1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 
1.index.
+   */
+  public static Set getInvalidAndMergedIndexFiles(List 
indexFiles)
+  throws IOException {
+SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+Set mergedAndInvalidIndexFiles = new HashSet<>();
+long lastModifiedTime = 0L;
+String validIndexFile = null;
+List mergeIndexFileNames = new ArrayList<>();
+for (String indexFile : indexFiles) {
+  if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+indexFileStore.readMergeFile(indexFile);
+Map> carbonMergeFileToIndexFilesMap =

Review comment:
   Done

##
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##
@@ -290,6 +290,7 @@ public static boolean 
updateTableMetadataStatus(Set updatedSegmentsList
 
 LoadMetadataDetails[] listOfLoadFolderDetailsArray =
 SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+boolean isUpdateRequired = false;

Review comment:
   Done

##
File path: 
integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##
@@ -185,13 +193,23 @@ object CarbonMergeFilesRDD {
 val readPath: String = 
CarbonTablePath.getSegmentFilesLocation(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR + 
segmentId + "_" +
segmentFileNameToSegmentIdMap.get(segmentId) + 
".tmp"
+val uuid = String.valueOf(System.currentTimeMillis)
+val newSegmentFileName = 
SegmentFileStore.genSegmentFileName(segmentId, uuid)
 // Merge all partition files into a single file.
-val segmentFileName: String = SegmentFileStore
-  .genSegmentFileName(segmentId, 
segmentFileNameToSegmentIdMap.get(segmentId))
-SegmentFileStore
+val segmentFile = SegmentFileStore
   .mergeSegmentFiles(readPath,
-segmentFileName,
+newSegmentFileName,
 CarbonTablePath.getSegmentFilesLocation(tablePath))
+if (segmentFile != null) {
+  val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
+CarbonTablePath.SEGMENT_EXT)
+  val status = SegmentFileStore.updateTableStatusFile(carbonTable, 
segmentId,

Review comment:
   this flow is called when mergeIndex on old tables. Update happens only 
once.

##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##
@@ -140,19 +141,47 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
   .get
   .filterNot(streamingSegment.contains(_))
   }
+validSegments.foreach { segment =>
+  if (segmentsToMerge.contains(segment.getSegmentNo)) {
+val segmentFile = segment.getSegmentFileName
+val sfs = new SegmentFileStore(carbonMainTable.getTablePath, 
segmentFile)

Review comment:
   Done.

##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
##
@@ -85,7 +87,21 @@ class AlterTableMergeIndexSIEventListener
   .asScala
 val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
 validSegments.foreach { segment =>
-  validSegmentIds += segment.getSegmentNo
+  val segmentFile = segment.getSegmentFileName
+  val sfs = new 
SegmentFileStore(indexCarbonTable.getTablePath, segmentFile)

Review comment:
   Done.

##
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##
@@ -129,8 +127,13 @@ private String mergeCarbonIndexFilesOfSegment(String 
segmentId,
 String partitionTempPath = "";
 for (String partition : partitionInfo) {
   if (partitionPath.equalsIgnoreCase(partition)) {
-partitionTempPath = partition + "/" + tempFolderPath;
-break;
+if (tempFolderPath != 

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534763831



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -439,6 +430,73 @@ public boolean accept(CarbonFile file) {
 return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex 
file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not 
deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds 
mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers 
the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 
1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 
1.index.
+   */
+  public static Set getInvalidAndMergedIndexFiles(List 
indexFiles)
+  throws IOException {
+SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();

Review comment:
   This method is called during read, when segment/table status file is not 
present or gets deleted, and when writing segment data size in tablestatus 
file. So for all old/new tables this method is called and taken care of. For 
new tables also, when stale data is present, it is used to filter invalid 
files. Ex: SI load when MT has stale index files, as during SI load MT segment 
file/ status file name is not updated, we directly get from segment directory. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534676791



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -162,15 +162,25 @@ public static void writeSegmentFile(String tablePath, 
String segmentId, String t
* corresponding partitions.
*/
   public static void writeSegmentFile(String tablePath, final String taskNo, 
String location,
-  String timeStamp, List partitionNames, boolean isMergeIndexFlow) 
throws IOException {
-String tempFolderLoc = timeStamp + ".tmp";
-String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + 
"/" + tempFolderLoc;
+  String timeStamp, List partitionNames, boolean isMergeIndexFlow,
+  boolean readFileFooterFromCarbonDataFile) throws IOException {
+String tempFolderLoc;
+String writePath;
+if (!readFileFooterFromCarbonDataFile) {
+  tempFolderLoc = timeStamp + ".tmp";
+  writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + 
tempFolderLoc;
+} else {
+  // If Alter merge index for old tables is triggered,
+  // directly write mergeindex file into segment file location
+  tempFolderLoc = location;
+  writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+}
 CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
 if (!carbonFile.exists()) {
   carbonFile.mkdirs();
 }
 CarbonFile tempFolder;
-if (isMergeIndexFlow) {
+if (isMergeIndexFlow || readFileFooterFromCarbonDataFile) {

Review comment:
   Done. removed unnecessary code from this method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534678076



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##
@@ -226,85 +233,59 @@ object SecondaryIndexUtil {
   validSegmentsToUse.toList.asJava,
   indexCarbonTable)
   }
-  mergedSegments.asScala.map { seg =>
-val file = SegmentFileStore.writeSegmentFile(
-  indexCarbonTable,
-  seg.getLoadName,
-  carbonLoadModel.getFactTimeStamp.toString,
-  null,
-  null)
-val segment = new Segment(seg.getLoadName, file)
-SegmentFileStore.updateTableStatusFile(indexCarbonTable,
-  seg.getLoadName,
-  file,
-  indexCarbonTable.getCarbonTableIdentifier.getTableId,
-  new SegmentFileStore(tablePath, segment.getSegmentFileName))
-segment
-  }
-
-  val statusLock =
-new 
SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
-  try {
-val retryCount = 
CarbonLockUtil.getLockProperty(CarbonCommonConstants
-  .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-  
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
-val maxTimeout = 
CarbonLockUtil.getLockProperty(CarbonCommonConstants
-  .MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-  CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
-if (statusLock.lockWithRetries(retryCount, maxTimeout)) {
-  val endTime = System.currentTimeMillis()
-  val loadMetadataDetails = SegmentStatusManager
-.readLoadMetadata(indexCarbonTable.getMetadataPath)
-  loadMetadataDetails.foreach(loadMetadataDetail => {
-if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
-  
loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
-  loadMetadataDetail.setLoadEndTime(endTime)
-  CarbonLoaderUtil
-.addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
-  loadMetadataDetail.getLoadName,
-  indexCarbonTable)
-}
-  })
-  SegmentStatusManager
-
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
-  loadMetadataDetails)
-} else {
-  throw new RuntimeException(
-"Not able to acquire the lock for table status updation for 
table " + databaseName +
-"." + indexCarbonTable.getTableName)
-}
-  } finally {
-if (statusLock != null) {
-  statusLock.unlock()
-}
-  }
-  // clear the indexSchema cache for the merged segments, as the index 
files and
-  // data files are rewritten after compaction
+  val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, 
java.lang.Long] =
+scala.collection.mutable.Map()
   if (mergedSegments.size > 0) {

Review comment:
   removed the check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677732



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##
@@ -267,15 +269,20 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
 // Merge all partition files into a single file.
 segmentFileName =
   mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
-val segmentFile = SegmentFileStore
-  .mergeSegmentFiles(readPath,
-segmentFileName,
-
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
-if (segmentFile != null) {
-  SegmentFileStore
-.moveFromTempFolder(segmentFile,
-  carbonLoadModel.getFactTimeStamp + ".tmp",
-  carbonLoadModel.getTablePath)
+if (!isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+  CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {

Review comment:
   Done

##
File path: 
integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##
@@ -167,6 +173,21 @@ object CarbonMergeFilesRDD {
 executorService.submit(new Runnable {
   override def run(): Unit = {
 ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+// If Alter merge index for old tables is triggered, do not 
delete index files
+// immediately to avoid index file not found during concurrent 
queries
+if (readFileFooterFromCarbonDataFile ||
+
!isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+  
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {

Review comment:
   Done

##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##
@@ -226,85 +233,59 @@ object SecondaryIndexUtil {
   validSegmentsToUse.toList.asJava,
   indexCarbonTable)
   }
-  mergedSegments.asScala.map { seg =>
-val file = SegmentFileStore.writeSegmentFile(
-  indexCarbonTable,
-  seg.getLoadName,
-  carbonLoadModel.getFactTimeStamp.toString,
-  null,
-  null)
-val segment = new Segment(seg.getLoadName, file)
-SegmentFileStore.updateTableStatusFile(indexCarbonTable,
-  seg.getLoadName,
-  file,
-  indexCarbonTable.getCarbonTableIdentifier.getTableId,
-  new SegmentFileStore(tablePath, segment.getSegmentFileName))
-segment
-  }
-
-  val statusLock =
-new 
SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
-  try {
-val retryCount = 
CarbonLockUtil.getLockProperty(CarbonCommonConstants
-  .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-  
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
-val maxTimeout = 
CarbonLockUtil.getLockProperty(CarbonCommonConstants
-  .MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-  CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
-if (statusLock.lockWithRetries(retryCount, maxTimeout)) {
-  val endTime = System.currentTimeMillis()
-  val loadMetadataDetails = SegmentStatusManager
-.readLoadMetadata(indexCarbonTable.getMetadataPath)
-  loadMetadataDetails.foreach(loadMetadataDetail => {
-if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
-  
loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
-  loadMetadataDetail.setLoadEndTime(endTime)
-  CarbonLoaderUtil
-.addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
-  loadMetadataDetail.getLoadName,
-  indexCarbonTable)
-}
-  })
-  SegmentStatusManager
-
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
-  loadMetadataDetails)
-} else {
-  throw new RuntimeException(
-"Not able to acquire the lock for table status updation for 
table " + databaseName +
-"." + indexCarbonTable.getTableName)
-}
-  } finally {
-if (statusLock != null) {
-  statusLock.unlock()
-}
-  }
-  // clear the indexSchema cache for the merged segments, as the index 
files and
-  // data files are rewritten after compaction
+  val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, 
java.lang.Long] =
+

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677505



##
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##
@@ -241,13 +266,17 @@ public String 
writeMergeIndexFileBasedOnSegmentFile(String segmentId,
   break;
 }
   }
+  if (!table.isIndexTable()) {

Review comment:
   done, maintained mapping for SI and MT now.

##
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##
@@ -277,21 +305,18 @@ public String 
writeMergeIndexFileBasedOnSegmentFile(String segmentId,
 LOGGER.error("unable to write segment file during merge index writing: 
" + ex.getMessage());
 throw ex;
   }
-  boolean status = SegmentFileStore.updateTableStatusFile(table, 
segmentId, newSegmentFileName,
-  table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
-  if (!status) {
-// revert to original segment file as the table status update has 
failed.
-SegmentStatusManager.writeStringIntoFile(path, content);
-// delete merge index file.
-for (String file : mergeIndexFiles) {
-  FileFactory.getCarbonFile(file).delete();
-}
-// no need to delete index files, so return from here.
-return uuid;
-  }
 }
-for (CarbonFile file : indexFiles) {
-  file.delete();
+boolean status = SegmentFileStore.updateTableStatusFile(table, segmentId, 
newSegmentFileName,

Review comment:
   `mergeIndexBasedOnSegmentFile `is called when old store is used (here, 
already segment detail is loaded as success in tablestatus), it will modify 
size details and segment file name.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677231



##
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##
@@ -672,7 +678,7 @@ public static boolean isMaxQueryTimeoutExceeded(long 
fileTimestamp) {
 
 long minutesElapsed = (difference / (1000 * 60));
 
-return minutesElapsed > maxTime;
+return minutesElapsed >= maxTime;

Review comment:
   done

##
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##
@@ -2558,8 +2558,11 @@ public static long getCarbonIndexSize(SegmentFileStore 
fileStore,
   // Get the total size of carbon data and the total size of carbon index
   public static HashMap getDataSizeAndIndexSize(String tablePath,
   Segment segment) throws IOException {
+SegmentFileStore fileStore = null;
 if (segment.getSegmentFileName() != null) {
-  SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
+  fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
+}
+if (segment.getSegmentFileName() != null && fileStore.getSegmentFile() != 
null) {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677179



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -664,7 +723,8 @@ public static SegmentFile 
getSegmentFileForPhysicalDataPartitions(String tablePa
   CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
 @Override
 public boolean accept(CarbonFile file) {
-  return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+  return file.getName().contains(uuid) && CarbonTablePath

Review comment:
   removed this part, not required.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677017



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -439,6 +430,73 @@ public boolean accept(CarbonFile file) {
 return null;
   }
 
+  /**

Review comment:
   For SI, the flow is: in progress - merge index MT  - call load SI - 
merge index SI - writesegmentFile for SI - success SI  - writesegmentFile for 
MT - success MT. 
   Here, SI is reading index files from segment path as MT segment file is not 
yet written.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534676835



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -198,20 +208,17 @@ public boolean accept(CarbonFile file) {
 folderDetails.setRelative(isRelative);
 folderDetails.setPartitions(partitionNames);
 folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
-for (CarbonFile file : carbonFiles) {
-  if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-folderDetails.setMergeFileName(file.getName());
-  } else {
-folderDetails.getFiles().add(file.getName());
-  }
-}
+setIndexFileNamesToFolderDetails(folderDetails, carbonFiles);
 segmentFile.addPath(location, folderDetails);
 String path = null;
 if (isMergeIndexFlow) {
   // in case of merge index flow, tasks are launched per partition and 
all the tasks
   // will be written to the same tmp folder, in that case taskNo is 
not unique.
   // To generate a unique fileName UUID is used
   path = writePath + "/" + CarbonUtil.generateUUID() + 
CarbonTablePath.SEGMENT_EXT;
+  if (readFileFooterFromCarbonDataFile) {
+path = writePath + "/" + timeStamp + CarbonTablePath.SEGMENT_EXT;

Review comment:
   removed unnecessary code from this method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534676791



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -162,15 +162,25 @@ public static void writeSegmentFile(String tablePath, 
String segmentId, String t
* corresponding partitions.
*/
   public static void writeSegmentFile(String tablePath, final String taskNo, 
String location,
-  String timeStamp, List partitionNames, boolean isMergeIndexFlow) 
throws IOException {
-String tempFolderLoc = timeStamp + ".tmp";
-String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + 
"/" + tempFolderLoc;
+  String timeStamp, List partitionNames, boolean isMergeIndexFlow,
+  boolean readFileFooterFromCarbonDataFile) throws IOException {
+String tempFolderLoc;
+String writePath;
+if (!readFileFooterFromCarbonDataFile) {
+  tempFolderLoc = timeStamp + ".tmp";
+  writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + 
tempFolderLoc;
+} else {
+  // If Alter merge index for old tables is triggered,
+  // directly write mergeindex file into segment file location
+  tempFolderLoc = location;
+  writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+}
 CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
 if (!carbonFile.exists()) {
   carbonFile.mkdirs();
 }
 CarbonFile tempFolder;
-if (isMergeIndexFlow) {
+if (isMergeIndexFlow || readFileFooterFromCarbonDataFile) {

Review comment:
   removed unnecessary code from this method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-12-02 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534676638



##
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##
@@ -162,15 +162,25 @@ public static void writeSegmentFile(String tablePath, 
String segmentId, String t
* corresponding partitions.
*/
   public static void writeSegmentFile(String tablePath, final String taskNo, 
String location,
-  String timeStamp, List partitionNames, boolean isMergeIndexFlow) 
throws IOException {
-String tempFolderLoc = timeStamp + ".tmp";
-String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + 
"/" + tempFolderLoc;
+  String timeStamp, List partitionNames, boolean isMergeIndexFlow,
+  boolean readFileFooterFromCarbonDataFile) throws IOException {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-11-12 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r522092928



##
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  override protected def beforeAll(): Unit = {
+sql("DROP TABLE IF EXISTS cleantest")
+CarbonProperties.getInstance()
+  .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "0")
+  }
+
+  override protected def afterAll(): Unit = {
+sql("DROP TABLE IF EXISTS cleantest")
+CarbonProperties.getInstance()
+  .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME.toString)
+  }
+
+  test("test clean files command for index files") {
+sql("drop table if exists cleantest")
+sql("create table cleantest(id int, issue date) STORED AS carbondata")
+sql("insert into table cleantest select '1','2000-02-01'")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+sql("clean files for table cleantest")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command for index files with SI") {
+sql("drop table if exists cleantest")
+sql("create table cleantest(id int, issue date, name string) STORED AS 
carbondata")
+sql("insert into table cleantest select '1','2000-02-01', 'abc' ")
+sql("create index indextable1 on table cleantest (name) AS 'carbondata'")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+assert(getIndexFileCountFromSegmentPath("default_indextable1", "0") == 1)
+sql("clean files for table cleantest")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+assert(getIndexFileCountFromSegmentPath("default_indextable1", "0") == 0)
+sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command on partition table") {
+sql("drop table if exists cleantest")
+sql("create table cleantest(id int, issue date) STORED AS carbondata " +
+"partitioned by (name string)")
+sql("insert into table cleantest select '1','2000-02-01', 'abc' ")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+sql("clean files for table cleantest")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command for index files after update") {
+sql("drop table if exists cleantest")
+sql("create table cleantest(id int, name string) STORED AS carbondata")
+sql("insert into table cleantest select '1', 'abc' ")
+sql("insert into table cleantest select '2', 'abc' ")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "1") == 1)
+sql("update cleantest set (name)=('xyz') where id=2")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "1") == 2)
+sql("clean files for table cleantest")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "1") == 1)
+checkAnswer(sql("select *from cleantest where id=2"), Seq(Row(2, "xyz")))
+  }
+
+  test("test clean files without mergeindex") {
+

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-11-10 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r520572527



##
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##
@@ -79,6 +79,11 @@
*/
   private Map> carbonMergeFileToIndexFilesMap;
 
+  /**
+   * Stores the list of invalid index files of the SI segments in case of 
small files.
+   */
+  private static Set oldSIIndexAndMergeIndexFiles = new HashSet<>();

Review comment:
   removed the usage of oldSIIndexAndMergeIndexFiles , as #3999 deals with 
merge and write segment file based on UUID (timestamp), the list is no longer 
needed for these steps.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-11-10 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r520568975



##
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##
@@ -274,20 +274,22 @@ public static boolean 
updateTableMetadataStatus(Set updatedSegmentsList
 
 LoadMetadataDetails[] listOfLoadFolderDetailsArray =
 SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
+Boolean isUpdateRequired = false;

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-11-10 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r520568875



##
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  override protected def beforeAll(): Unit = {
+sql("DROP TABLE IF EXISTS cleantest")
+CarbonProperties.getInstance()
+  .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "0")
+  }
+
+  override protected def afterAll(): Unit = {
+sql("DROP TABLE IF EXISTS cleantest")
+CarbonProperties.getInstance()
+  .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME.toString)
+  }
+
+  test("test clean files command for index files") {
+sql("drop table if exists cleantest")
+sql("create table cleantest(id int, issue date) STORED AS carbondata")
+sql("insert into table cleantest select '1','2000-02-01'")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+sql("clean files for table cleantest")
+assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command for index files with SI") {

Review comment:
   ok done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

2020-11-03 Thread GitBox


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r515750468



##
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##
@@ -368,12 +373,21 @@ public static void 
getCarbonIndexFilesRecursively(CarbonFile carbonFile,
 return carbonFile.listFiles(new CarbonFileFilter() {
   @Override
   public boolean accept(CarbonFile file) {
-return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
-.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() 
> 0);
+return (!oldIndexAndMergeIndexFiles.contains(file.getAbsolutePath()) 
&& (
+file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
+.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && 
file.getSize() > 0);
   }
 });
   }
 
+  public static List getOldIndexAndMergeIndexFiles() {
+return oldIndexAndMergeIndexFiles;
+  }
+
+  public static void setOldIndexAndMergeIndexFiles(List 
oldIndexAndMergeIndexFiles) {

Review comment:
   While writing merge index file /segment file, it gets index files from 
segment directory. And during SI small files merge step, we will have 
`old.index`, `new.index` files and only `new.index` is valid for merge step and 
writing segment file. For maintable also we could store and use like this for 
normal load but in few cases like add segment/reading from external location, 
we will have to read mergeindex file to identify invalid index files. And for 
clean files, I'm getting all index files from the segment directory and 
eliminate which are not present in segment file.

##
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##
@@ -368,12 +373,21 @@ public static void 
getCarbonIndexFilesRecursively(CarbonFile carbonFile,
 return carbonFile.listFiles(new CarbonFileFilter() {

Review comment:
   ok removed

##
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
 if (null == index) {

Review comment:
   done

##
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
 if (null == index) {
   index = new LinkedList<>();
 }
+CarbonFile[] carbonIndexFiles =
+
index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);
+List mergedIndexFiles =
+SegmentFileStore.getInvalidAndMergedIndexFiles(carbonIndexFiles);
 for (String indexPath : index) {

Review comment:
   done

##
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##
@@ -79,18 +83,30 @@ public 
TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map getCommittedIndexFile(Segment segment) throws 
IOException {
 Map indexFiles;
-if (segment.getSegmentFileName() == null) {
+SegmentFileStore fileStore =
+new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
+if (fileStore.getSegmentFile() == null) {
   String path =
   CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
   indexFiles = new 
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
 } else {
-  SegmentFileStore fileStore =
-  new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
   indexFiles = fileStore.getIndexOrMergeFiles();
   if (fileStore.getSegmentFile() != null) {
 
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
   }
 }
+List index = new ArrayList<>(indexFiles.keySet());
+CarbonFile[] carbonIndexFiles =
+
index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);

Review comment:
   done. For `getInvalidAndMergedIndexFiles`, it takes `List` as 
argument and returns `List`. 

##
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String 
segmentId,
   indexFilesInPartition.add(indexCarbonFile);
 }
   }
-  indexFiles = indexFilesInPartition.toArray(new 
CarbonFile[indexFilesInPartition.size()]);
+  indexFiles = indexFilesInPartition;
 } else {
-  indexFiles = indexCarbonFiles.toArray(new 
CarbonFile[indexCarbonFiles.size()]);
+  indexFiles = indexCarbonFiles;
 }
+  }
+  if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+return