This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d287b8  [CARBONDATA-3582] support table status file backup
5d287b8 is described below

commit 5d287b82b631e65869fe2814be1f8f5c9183680c
Author: Jacky Li <jacky.li...@qq.com>
AuthorDate: Thu Nov 14 18:10:54 2019 +0800

    [CARBONDATA-3582] support table status file backup
    
    When overwriting table status file, if process crashed, table status
    file will be in corrupted state. This can happen in an unstable
    environment, like in the cloud. To prevent the table corruption, user
    can enable a newly added CarbonProperty to enable backup of the table
    status before overwriting it.
    
    New CarbonProperty: ENABLE_TABLE_STATUS_BACKUP (default is false)
    When enabling this property, "tablestatus.backup" file will be created
    in the same folder of "tablestatus" file
    
    This closes #3459
---
 .../core/constants/CarbonCommonConstants.java      |  11 ++
 .../core/statusmanager/SegmentStatusManager.java   | 135 ++++++++++++++-------
 .../carbondata/core/util/CarbonProperties.java     |   5 +
 docs/configuration-parameters.md                   |   1 +
 integration/spark2/pom.xml                         |   5 +
 .../spark/carbondata/TableStatusBackupTest.scala   |  76 ++++++++++++
 6 files changed, 190 insertions(+), 43 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index bd60a16..2e3e7d3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1296,6 +1296,17 @@ public final class CarbonCommonConstants {
   public static final String ENABLE_VECTOR_READER_DEFAULT = "true";
 
   /**
+   * In cloud object store scenario, overwriting table status file is not an 
atomic
+   * operation since it uses rename API. Thus, it is possible that table 
status is corrupted
+   * if process crashed when overwriting the table status file.
+   * To protect from file corruption, user can enable this property.
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String ENABLE_TABLE_STATUS_BACKUP = 
"carbon.enable.tablestatus.backup";
+
+  public static final String ENABLE_TABLE_STATUS_BACKUP_DEFAULT = "false";
+
+  /**
    * property to set is IS_DRIVER_INSTANCE
    */
   @CarbonProperty
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index bc52082..2b76db1 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -254,18 +255,23 @@ public class SegmentStatusManager {
     }
   }
 
-  public static LoadMetadataDetails[] readTableStatusFile(String 
tableStatusPath)
-      throws IOException {
-    Gson gsonObjectToRead = new Gson();
+  /**
+   * Read file and return its content as string
+   *
+   * @param tableStatusPath path of the table status to read
+   * @return file content, null is file does not exist
+   * @throws IOException if IO errors
+   */
+  private static String readFileAsString(String tableStatusPath) throws 
IOException {
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
-    LoadMetadataDetails[] loadFolderDetails = null;
+
     AtomicFileOperations fileOperation =
         AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath);
 
     if (!FileFactory.isFileExist(tableStatusPath)) {
-      return new LoadMetadataDetails[0];
+      return null;
     }
 
     // When storing table status file in object store, reading of table status 
file may
@@ -277,8 +283,7 @@ public class SegmentStatusManager {
         dataInputStream = fileOperation.openForRead();
         inStream = new InputStreamReader(dataInputStream, 
Charset.forName(DEFAULT_CHARSET));
         buffReader = new BufferedReader(inStream);
-        loadFolderDetails = gsonObjectToRead.fromJson(buffReader, 
LoadMetadataDetails[].class);
-        retry = 0;
+        return buffReader.readLine();
       } catch (EOFException ex) {
         retry--;
         if (retry == 0) {
@@ -299,13 +304,23 @@ public class SegmentStatusManager {
         closeStreams(buffReader, inStream, dataInputStream);
       }
     }
+    return null;
+  }
 
-    // if listOfLoadFolderDetailsArray is null, return empty array
-    if (null == loadFolderDetails) {
+  /**
+   * Read table status file and decoded to segment meta arrays
+   *
+   * @param tableStatusPath table status file path
+   * @return segment metadata
+   * @throws IOException if IO errors
+   */
+  public static LoadMetadataDetails[] readTableStatusFile(String 
tableStatusPath)
+      throws IOException {
+    String content = readFileAsString(tableStatusPath);
+    if (content == null) {
       return new LoadMetadataDetails[0];
     }
-
-    return loadFolderDetails;
+    return new Gson().fromJson(content, LoadMetadataDetails[].class);
   }
 
   /**
@@ -314,7 +329,7 @@ public class SegmentStatusManager {
    * @param loadMetadataDetails
    * @return
    */
-  public static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) 
{
+  private static int getMaxSegmentId(LoadMetadataDetails[] 
loadMetadataDetails) {
     int newSegmentId = -1;
     for (int i = 0; i < loadMetadataDetails.length; i++) {
       try {
@@ -525,45 +540,78 @@ public class SegmentStatusManager {
   }
 
   /**
-   * writes load details into a given file at @param dataLoadLocation
+   * Backup the table status file as 'tablestatus.backup' in the same path
    *
-   * @param dataLoadLocation
-   * @param listOfLoadFolderDetailsArray
-   * @throws IOException
+   * @param tableStatusPath table status file path
    */
-  public static void writeLoadDetailsIntoFile(String dataLoadLocation,
+  private static void backupTableStatus(String tableStatusPath) throws 
IOException {
+    CarbonFile file = FileFactory.getCarbonFile(tableStatusPath);
+    if (file.exists()) {
+      String backupPath = tableStatusPath + ".backup";
+      String currentContent = readFileAsString(tableStatusPath);
+      if (currentContent != null) {
+        writeStringIntoFile(backupPath, currentContent);
+      }
+    }
+  }
+
+  /**
+   * writes load details to specified path
+   *
+   * @param tableStatusPath path of the table status file
+   * @param listOfLoadFolderDetailsArray segment metadata
+   * @throws IOException if IO errors
+   */
+  public static void writeLoadDetailsIntoFile(
+      String tableStatusPath,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
-    AtomicFileOperations fileWrite =
-        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
+    // When overwriting table status file, if process crashed, table status 
file
+    // will be in corrupted state. This can happen in an unstable environment,
+    // like in the cloud. To prevent the table corruption, user can enable 
following
+    // property to enable backup of the table status before overwriting it.
+    if (tableStatusPath.endsWith(CarbonTablePath.TABLE_STATUS_FILE) &&
+        CarbonProperties.isEnableTableStatusBackup()) {
+      backupTableStatus(tableStatusPath);
+    }
+    String content = new Gson().toJson(listOfLoadFolderDetailsArray);
+    mockForTest();
+    // make the table status file smaller by removing fields that are default 
value
+    for (LoadMetadataDetails loadMetadataDetails : 
listOfLoadFolderDetailsArray) {
+      loadMetadataDetails.removeUnnecessaryField();
+    }
+    // If process crashed during following write, table status file need to be
+    // manually recovered.
+    writeStringIntoFile(tableStatusPath, content);
+  }
+
+  // a dummy func for mocking in testcase, which simulates IOException
+  private static void mockForTest() throws IOException {
+  }
+
+  /**
+   * writes string content to specified path
+   *
+   * @param filePath path of the file to write
+   * @param content content to write
+   * @throws IOException if IO errors
+   */
+  private static void writeStringIntoFile(String filePath, String content) 
throws IOException {
+    AtomicFileOperations fileWrite = 
AtomicFileOperationFactory.getAtomicFileOperations(filePath);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
-    Gson gsonObjectToWrite = new Gson();
-    // write the updated data into the metadata file.
-
     try {
       dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(DEFAULT_CHARSET)));
-
-      // make the table status file smaller by removing fields that are 
default value
-      for (LoadMetadataDetails loadMetadataDetails : 
listOfLoadFolderDetailsArray) {
-        loadMetadataDetails.removeUnnecessaryField();
-      }
-
-      String metadataInstance = 
gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
-      brWriter.write(metadataInstance);
+      brWriter = new BufferedWriter(new OutputStreamWriter(
+          dataOutputStream, Charset.forName(DEFAULT_CHARSET)));
+      brWriter.write(content);
     } catch (IOException ioe) {
-      LOG.error("Error message: " + ioe.getLocalizedMessage());
+      LOG.error("Write file failed: " + ioe.getLocalizedMessage());
       fileWrite.setFailed();
       throw ioe;
     } finally {
-      if (null != brWriter) {
-        brWriter.flush();
-      }
       CarbonUtil.closeStreams(brWriter);
       fileWrite.close();
     }
-
   }
 
   /**
@@ -637,7 +685,7 @@ public class SegmentStatusManager {
    * @param invalidLoadTimestamps
    * @return invalidLoadTimestamps
    */
-  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+  private static List<String> updateDeletionStatus(AbsoluteTableIdentifier 
absoluteTableIdentifier,
       String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
       List<String> invalidLoadTimestamps, Long loadStartTime) {
     // For each load timestamp loop through data and if the
@@ -708,8 +756,7 @@ public class SegmentStatusManager {
    * @param newMetadata
    * @return
    */
-
-  public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
+  private static List<LoadMetadataDetails> updateLatestTableStatusDetails(
       LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
 
     List<LoadMetadataDetails> newListMetadata =
@@ -727,7 +774,7 @@ public class SegmentStatusManager {
    *
    * @param loadMetadata
    */
-  public static void updateSegmentMetadataDetails(LoadMetadataDetails 
loadMetadata) {
+  private static void updateSegmentMetadataDetails(LoadMetadataDetails 
loadMetadata) {
     // update status only if the segment is not marked for delete
     if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
       loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
@@ -893,7 +940,7 @@ public class SegmentStatusManager {
    * @param newList
    * @return
    */
-  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+  private static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
       LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
 
     List<LoadMetadataDetails> newListMetadata =
@@ -1024,7 +1071,9 @@ public class SegmentStatusManager {
               // update the metadata details from old to new status.
               List<LoadMetadataDetails> latestStatus =
                   updateLoadMetadataFromOldToNew(tuple2.details, 
latestMetadata);
-              writeLoadMetadata(identifier, latestStatus);
+              writeLoadDetailsIntoFile(
+                  
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()),
+                  latestStatus.toArray(new LoadMetadataDetails[0]));
             }
             updationCompletionStatus = true;
           } else {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c5e1dcc..35f11f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1132,6 +1132,11 @@ public final class CarbonProperties {
         
CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).equalsIgnoreCase("true");
   }
 
+  public static boolean isEnableTableStatusBackup() {
+    return 
getInstance().getProperty(CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP,
+        
CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP_DEFAULT).equalsIgnoreCase("true");
+  }
+
   /**
    * Validate the restrictions
    *
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 736670e..5df3c09 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -50,6 +50,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.lock.retry.timeout.sec | 5 | Specifies the interval between the 
retries to obtain the lock for any operation other than load. **NOTE:** Refer 
to ***carbon.lock.retries*** for understanding why CarbonData uses locks for 
operations. |
 | carbon.fs.custom.file.provider | None | To support FileTypeInterface for 
configuring custom CarbonFile implementation to work with custom FileSystem. |
 | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures 
which day of the week to be considered as first day of the week. Because first 
day of the week will be different in different parts of the world. |
+| carbon.enable.tablestatus.backup | false | In cloud object store scenario, 
overwriting table status file is not an atomic operation since it uses rename 
API. Thus, it is possible that table status is corrupted if process crashed 
when overwriting the table status file. To protect from file corruption, user 
can enable this property. |
 
 ## Data Loading Configuration
 
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 9fe8498..7b65e0b 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -100,6 +100,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.jmockit</groupId>
+      <artifactId>jmockit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TableStatusBackupTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TableStatusBackupTest.scala
new file mode 100644
index 0000000..c1649b0
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TableStatusBackupTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.IOException
+
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TableStatusBackupTest extends QueryTest with BeforeAndAfterAll {
+  override protected def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP, "true")
+    sql("drop table if exists source")
+    sql("create table source(a string) stored as carbondata")
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("drop table if exists source")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP, "false")
+  }
+
+  test("backup table status file") {
+    sql("insert into source values ('A'), ('B')")
+    val tablePath = CarbonEnv.getCarbonTable(None, 
"source")(sqlContext.sparkSession).getTablePath
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(tablePath)
+    val oldTableStatus = 
SegmentStatusManager.readTableStatusFile(tableStatusFilePath)
+
+    var mock = new MockUp[SegmentStatusManager]() {
+      @Mock
+      @throws[IOException]
+      def mockForTest(): Unit = {
+        throw new IOException("thrown in mock")
+      }
+    }
+
+    val exception = intercept[IOException] {
+      sql("insert into source values ('A'), ('B')")
+    }
+    assert(exception.getMessage.contains("thrown in mock"))
+    val backupPath = tableStatusFilePath + ".backup"
+    assert(FileFactory.isFileExist(backupPath))
+    val backupTableStatus = 
SegmentStatusManager.readTableStatusFile(backupPath)
+    assertResult(oldTableStatus)(backupTableStatus)
+
+    mock = new MockUp[SegmentStatusManager]() {
+      @Mock
+      def mockForTest(): Unit = {
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to