[CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module To make assembling JAR of store-sdk module, it should not depend on carbon-spark module
This closes #1970 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0d50f654 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0d50f654 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0d50f654 Branch: refs/heads/carbonstore Commit: 0d50f65461ae3855db66f44fa06e01174de50ccd Parents: a848ccf Author: Jacky Li <jacky.li...@qq.com> Authored: Sun Feb 11 21:37:04 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Mon Feb 12 16:06:49 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/carbondata/common/Maps.java | 39 ++ .../org/apache/carbondata/common/Strings.java | 3 + .../ConcurrentOperationException.java | 56 +++ .../exceptions/TableStatusLockException.java | 34 ++ .../sql/InvalidLoadOptionException.java | 33 ++ .../sql/MalformedCarbonCommandException.java | 75 +++ .../sql/MalformedDataMapCommandException.java | 37 ++ .../exceptions/sql/NoSuchDataMapException.java | 39 ++ .../statusmanager/SegmentStatusManager.java | 124 +++++ .../carbondata/core/util/DeleteLoadFolders.java | 200 ++++++++ .../preaggregate/TestPreAggCreateCommand.scala | 2 +- .../preaggregate/TestPreAggregateDrop.scala | 2 +- .../timeseries/TestTimeSeriesCreateTable.scala | 2 +- .../timeseries/TestTimeSeriesDropSuite.scala | 2 +- .../TestTimeseriesTableSelection.scala | 2 +- .../TestDataLoadWithColumnsMoreThanSchema.scala | 3 +- .../dataload/TestGlobalSortDataLoad.scala | 2 +- .../TestLoadDataWithDiffTimestampFormat.scala | 2 +- .../TestLoadDataWithFileHeaderException.scala | 11 +- ...ataWithMalformedCarbonCommandException.scala | 3 +- .../testsuite/dataload/TestLoadOptions.scala | 2 +- .../dataload/TestTableLevelBlockSize.scala | 4 +- .../testsuite/datamap/TestDataMapCommand.scala | 2 +- .../dataretention/DataRetentionTestCase.scala | 2 +- .../spark/testsuite/datetype/DateTypeTest.scala | 2 +- .../testsuite/sortcolumns/TestSortColumns.scala | 3 +- integration/spark-common/pom.xml | 5 - .../exception/ConcurrentOperationException.java | 44 -- .../MalformedCarbonCommandException.java | 69 --- .../MalformedDataMapCommandException.java | 32 -- .../spark/exception/NoSuchDataMapException.java | 33 -- .../org/apache/carbondata/api/CarbonStore.scala | 6 +- .../spark/CarbonColumnValidator.scala | 8 +- .../carbondata/spark/load/ValidateUtil.scala | 71 --- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +- .../carbondata/spark/util/CommonUtil.scala | 56 +-- .../carbondata/spark/util/DataLoadingUtil.scala | 451 ------------------- .../spark/util/GlobalDictionaryUtil.scala | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 4 +- .../spark/rdd/CarbonTableCompactor.scala | 2 +- .../org/apache/spark/sql/CarbonSource.scala | 2 +- .../datamap/CarbonCreateDataMapCommand.scala | 2 +- .../datamap/CarbonDropDataMapCommand.scala | 2 +- .../CarbonAlterTableCompactionCommand.scala | 11 +- .../management/CarbonLoadDataCommand.scala | 18 +- .../CarbonProjectForDeleteCommand.scala | 2 +- .../CarbonProjectForUpdateCommand.scala | 2 +- .../command/mutation/IUDCommonUtil.scala | 2 +- .../CreatePreAggregateTableCommand.scala | 3 +- .../preaaggregate/PreAggregateUtil.scala | 2 +- .../schema/CarbonAlterTableRenameCommand.scala | 2 +- .../command/timeseries/TimeSeriesUtil.scala | 2 +- .../datasources/CarbonFileFormat.scala | 15 +- .../sql/execution/strategy/DDLStrategy.scala | 2 +- .../strategy/StreamingTableStrategy.scala | 2 +- .../execution/command/CarbonHiveCommands.scala | 2 +- .../sql/parser/CarbonSpark2SqlParser.scala | 2 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 2 +- .../org/apache/spark/util/AlterTableUtil.scala | 2 +- .../org/apache/spark/util/TableAPIUtil.scala | 2 +- .../spark/sql/hive/CarbonSessionState.scala | 2 +- .../segmentreading/TestSegmentReading.scala | 2 +- .../spark/util/AllDictionaryTestCase.scala | 4 +- .../util/ExternalColumnDictionaryTestCase.scala | 6 +- .../TestStreamingTableOperation.scala | 2 +- .../bucketing/TableBucketingTestCase.scala | 2 +- .../vectorreader/AddColumnTestCases.scala | 2 +- .../loading/model/CarbonLoadModel.java | 13 +- .../loading/model/CarbonLoadModelBuilder.java | 322 +++++++++++++ .../processing/loading/model/LoadOption.java | 251 +++++++++++ .../processing/util/CarbonLoaderUtil.java | 84 +--- .../processing/util/DeleteLoadFolders.java | 200 -------- store/sdk/pom.xml | 2 +- .../sdk/file/CarbonWriterBuilder.java | 15 +- .../sdk/file/CSVCarbonWriterSuite.java | 2 +- .../streaming/StreamSinkFactory.scala | 11 +- 77 files changed, 1330 insertions(+), 1146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/Maps.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/Maps.java b/common/src/main/java/org/apache/carbondata/common/Maps.java new file mode 100644 index 0000000..14fc329 --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/Maps.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common; + +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +@InterfaceAudience.Developer +public class Maps { + + /** + * Return value if key is contained in the map, else return defauleValue. + * This is added to avoid JDK 8 dependency + */ + public static <K, V> V getOrDefault(Map<K, V> map, K key, V defaultValue) { + V value = map.get(key); + if (value != null) { + return value; + } else { + return defaultValue; + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/Strings.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java b/common/src/main/java/org/apache/carbondata/common/Strings.java index 23288dd..08fdc3c 100644 --- a/common/src/main/java/org/apache/carbondata/common/Strings.java +++ b/common/src/main/java/org/apache/carbondata/common/Strings.java @@ -19,6 +19,9 @@ package org.apache.carbondata.common; import java.util.Objects; +import org.apache.carbondata.common.annotations.InterfaceAudience; + +@InterfaceAudience.Developer public class Strings { /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java new file mode 100644 index 0000000..a14d161 --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown when executing concurrent operations which + * is not supported in carbon. + * + * For example, when INSERT OVERWRITE is executing, other operations are not + * allowed, so this exception will be thrown + */ +@InterfaceAudience.User +@InterfaceStability.Stable +public class ConcurrentOperationException extends Exception { + + /** + * The Error message. + */ + private String msg; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public ConcurrentOperationException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java new file mode 100644 index 0000000..89cfd46 --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown when failed to acquire lock for table status metadata, + * or re-try timed out + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class TableStatusLockException extends RuntimeException { + + public TableStatusLockException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java new file mode 100644 index 0000000..41b2434 --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions.sql; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown when loading option is invalid for SQL + * loading statement (LOAD DATA, INSERT INTO) + */ +@InterfaceAudience.User +@InterfaceStability.Stable +public class InvalidLoadOptionException extends MalformedCarbonCommandException { + public InvalidLoadOptionException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java new file mode 100644 index 0000000..5fe3ce8 --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions.sql; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown if any validation fails then parsing + * SQL statement. + */ +@InterfaceAudience.User +@InterfaceStability.Stable +public class MalformedCarbonCommandException extends Exception { + + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public MalformedCarbonCommandException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public MalformedCarbonCommandException(String msg, Throwable t) { + super(msg, t); + this.msg = msg; + } + + /** + * getLocalizedMessage + */ + @Override + public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java new file mode 100644 index 0000000..7c25b2c --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions.sql; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown when Datamap related SQL statement is invalid + */ +@InterfaceAudience.User +@InterfaceStability.Stable +public class MalformedDataMapCommandException extends MalformedCarbonCommandException { + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + public MalformedDataMapCommandException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java new file mode 100644 index 0000000..7ab9048 --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions.sql; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown if datamap is not found when executing datamap + * related SQL statement + */ +@InterfaceAudience.User +@InterfaceStability.Stable +public class NoSuchDataMapException extends MalformedCarbonCommandException { + + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + public NoSuchDataMapException(String dataMapName, String tableName) { + super("Datamap with name " + dataMapName + " does not exist under table " + tableName); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index c613735..2e73aef 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.carbondata.common.exceptions.TableStatusLockException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -46,6 +47,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DeleteLoadFolders; import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; @@ -708,4 +710,126 @@ public class SegmentStatusManager { } } + private static boolean isLoadDeletionRequired(String metaDataLocation) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + if (details != null && details.length > 0) { + for (LoadMetadataDetails oneRow : details) { + if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus() + || SegmentStatus.COMPACTED == oneRow.getSegmentStatus() + || SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus() + || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus()) + && oneRow.getVisibility().equalsIgnoreCase("true")) { + return true; + } + } + } + return false; + } + + /** + * This will update the old table status details before clean files to the latest table status. + * @param oldList + * @param newList + * @return + */ + public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew( + LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) { + + List<LoadMetadataDetails> newListMetadata = + new ArrayList<LoadMetadataDetails>(Arrays.asList(newList)); + for (LoadMetadataDetails oldSegment : oldList) { + if ("false".equalsIgnoreCase(oldSegment.getVisibility())) { + newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false"); + } + } + return newListMetadata; + } + + private static void writeLoadMetadata(AbsoluteTableIdentifier identifier, + List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { + String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); + + DataOutputStream dataOutputStream; + Gson gsonObjectToWrite = new Gson(); + BufferedWriter brWriter = null; + + AtomicFileOperations writeOperation = + new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); + + try { + + dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); + brWriter.write(metadataInstance); + } finally { + try { + if (null != brWriter) { + brWriter.flush(); + } + } catch (Exception e) { + LOG.error("error in flushing "); + + } + CarbonUtil.closeStreams(brWriter); + writeOperation.close(); + } + } + + public static void deleteLoadsAndUpdateMetadata( + CarbonTable carbonTable, + boolean isForceDeletion) throws IOException { + if (isLoadDeletionRequired(carbonTable.getMetadataPath())) { + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj( + identifier, LockUsage.TABLE_STATUS_LOCK); + + // Delete marked loads + boolean isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem( + identifier, isForceDeletion, details, carbonTable.getMetadataPath()); + + boolean updationCompletionStatus = false; + + if (isUpdationRequired) { + try { + // Update load metadate file after cleaning deleted nodes + if (carbonTableStatusLock.lockWithRetries()) { + LOG.info("Table status lock has been successfully acquired."); + + // read latest table status again. + LoadMetadataDetails[] latestMetadata = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + + // update the metadata details from old to new status. + List<LoadMetadataDetails> latestStatus = + updateLoadMetadataFromOldToNew(details, latestMetadata); + + writeLoadMetadata(identifier, latestStatus); + } else { + String dbName = identifier.getCarbonTableIdentifier().getDatabaseName(); + String tableName = identifier.getCarbonTableIdentifier().getTableName(); + String errorMsg = "Clean files request is failed for " + + dbName + "." + tableName + + ". Not able to acquire the table status lock due to other operation " + + "running in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new TableStatusLockException(errorMsg + " Please try after some time."); + } + updationCompletionStatus = true; + } finally { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + if (updationCompletionStatus) { + DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( + identifier, carbonTable.getMetadataPath(), isForceDeletion); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java new file mode 100644 index 0000000..ba4c4fc --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.locks.CarbonLockFactory; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.locks.LockUsage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +public final class DeleteLoadFolders { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DeleteLoadFolders.class.getName()); + + private DeleteLoadFolders() { + + } + + /** + * returns segment path + * + * @param identifier + * @param oneLoad + * @return + */ + private static String getSegmentPath(AbsoluteTableIdentifier identifier, + LoadMetadataDetails oneLoad) { + String segmentId = oneLoad.getLoadName(); + return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); + } + + public static void physicalFactAndMeasureMetadataDeletion( + AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { + String path = getSegmentPath(absoluteTableIdentifier, oneLoad); + boolean status = false; + try { + if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { + CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) + || CarbonTablePath.isCarbonIndexFile(file.getName()) + || CarbonTablePath.isPartitionMapFile(file.getName())); + } + }); + + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { + + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); + status = false; + } else { + status = true; + } + } + } + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn( + "Unable to delete the folder as per delete command " + file.getAbsolutePath()); + } + } + + } else { + LOGGER.warn("Files are not found in segment " + path + + " it seems, files are already being deleted"); + } + } catch (IOException e) { + LOGGER.warn("Unable to delete the file as per delete command " + path); + } + } + } + } + + private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, + boolean isForceDelete) { + if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() || + SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || + SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() || + SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus()) + && oneLoad.getVisibility().equalsIgnoreCase("true")) { + if (isForceDelete) { + return true; + } + long deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); + + return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); + + } + + return false; + } + + private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad, + boolean isForceDelete) { + if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() || + SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) { + if (isForceDelete) { + return true; + } + long deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); + + return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); + + } + + return false; + } + + private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, + String metadataPath) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { + return oneLoad; + } + } + return null; + } + + public static boolean deleteLoadFoldersFromFileSystem( + AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, + LoadMetadataDetails[] details, String metadataPath) { + boolean isDeleted = false; + if (details != null && details.length != 0) { + for (LoadMetadataDetails oneLoad : details) { + if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + try { + if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS + || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) { + if (segmentLock.lockWithRetries(1, 5)) { + LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName()); + LoadMetadataDetails currentDetails = + getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath); + if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails, + isForceDelete)) { + oneLoad.setVisibility("false"); + isDeleted = true; + LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + } + } else { + LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName()); + return isDeleted; + } + } else { + oneLoad.setVisibility("false"); + isDeleted = true; + LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + } + } finally { + segmentLock.unlock(); + LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released"); + } + } + } + } + return isDeleted; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 23132de..b3bf93d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -26,10 +26,10 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala index 1138adf..db0fb3f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala @@ -20,7 +20,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.spark.exception.NoSuchDataMapException +import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index 0ca7cb9..97aa056 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -19,8 +19,8 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES -import org.apache.carbondata.spark.exception.{MalformedDataMapCommandException, MalformedCarbonCommandException} class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala index 545c4de..5fe21e8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException class TestTimeSeriesDropSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala index 3065952..3f140df 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala index 1532328..4e5ebbb 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala @@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.dataload import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + /** * This class will test data load in which number of columns in data are more than * the number of columns in schema http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 5e5eed5..aacd28b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -23,13 +23,13 @@ import org.apache.commons.io.FileUtils import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.Row import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.test.TestQueryExecutor.projectPath import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.spark.rdd.CarbonScanRDD http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala index ec6fff1..c06d782 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala @@ -25,10 +25,10 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterAll { val bad_records_action = CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala index 7700ed5..edcdd51 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala @@ -20,6 +20,9 @@ package org.apache.carbondata.spark.testsuite.dataload import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException + class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterAll{ override def beforeAll { sql("DROP TABLE IF EXISTS t3") @@ -32,7 +35,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA } test("test load data both file and ddl without file header exception") { - val e = intercept[Exception] { + val e = intercept[CarbonDataLoadingException] { sql( s"""LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3""") } @@ -41,7 +44,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA } test("test load data ddl provided wrong file header exception") { - val e = intercept[Exception] { + val e = intercept[CarbonDataLoadingException] { sql( s""" LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 @@ -52,7 +55,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA } test("test load data with wrong header , but without fileheader") { - val e = intercept[Exception] { + val e = intercept[InvalidLoadOptionException] { sql( s""" LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3 @@ -63,7 +66,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA } test("test load data with wrong header and fileheader") { - val e = intercept[Exception] { + val e = intercept[InvalidLoadOptionException] { sql( s""" LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3 http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala index 1851705..6759049 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala @@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.dataload import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + class TestLoadDataWithMalformedCarbonCommandException extends QueryTest with BeforeAndAfterAll { override def beforeAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala index d2c7e63..4ec9335 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException class TestLoadOptions extends QueryTest with BeforeAndAfterAll{ http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala index a77b210..f6a049a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala @@ -19,11 +19,13 @@ package org.apache.carbondata.spark.testsuite.dataload import org.apache.spark.sql.Row import org.scalatest.BeforeAndAfterAll + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + /** * Test Class for table block size * http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index 5170c43..37007ed 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -23,11 +23,11 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.spark.exception.MalformedDataMapCommandException class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala index 7c82f75..a70584b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala @@ -29,9 +29,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.schema.table.CarbonTable /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala index e2df07c..b9b01f8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala @@ -16,10 +16,10 @@ */ package org.apache.carbondata.spark.testsuite.datetype -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException class DateTypeTest extends QueryTest with BeforeAndAfterAll{ http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala index 7c288b3..dd1aab8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala @@ -21,10 +21,11 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + class TestSortColumns extends QueryTest with BeforeAndAfterAll { override def beforeAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml index d40e213..e80593b 100644 --- a/integration/spark-common/pom.xml +++ b/integration/spark-common/pom.xml @@ -36,11 +36,6 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-processing</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java deleted file mode 100644 index 1f3c07d..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.exception; - -public class ConcurrentOperationException extends Exception { - - /** - * The Error message. - */ - private String msg = ""; - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public ConcurrentOperationException(String msg) { - super(msg); - this.msg = msg; - } - - /** - * getMessage - */ - public String getMessage() { - return this.msg; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java deleted file mode 100644 index 9f441d3..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.exception; - -// After parsing carbon query successfully , if any validation fails then -// use MalformedCarbonCommandException -public class MalformedCarbonCommandException extends Exception { - - - /** - * default serial version ID. - */ - private static final long serialVersionUID = 1L; - - /** - * The Error message. - */ - private String msg = ""; - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public MalformedCarbonCommandException(String msg) { - super(msg); - this.msg = msg; - } - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public MalformedCarbonCommandException(String msg, Throwable t) { - super(msg, t); - this.msg = msg; - } - - /** - * getLocalizedMessage - */ - @Override - public String getLocalizedMessage() { - return super.getLocalizedMessage(); - } - - /** - * getMessage - */ - public String getMessage() { - return this.msg; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java deleted file mode 100644 index a05d8e6..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.exception; - -/** - * Throw exception when using illegal argument - */ -public class MalformedDataMapCommandException extends MalformedCarbonCommandException { - /** - * default serial version ID. - */ - private static final long serialVersionUID = 1L; - - public MalformedDataMapCommandException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java deleted file mode 100644 index 959e70d..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.exception; - -/** - * if the dataMap does not exist, carbon should throw NoSuchDataMapException - */ -public class NoSuchDataMapException extends MalformedCarbonCommandException { - - /** - * default serial version ID. - */ - private static final long serialVersionUID = 1L; - - public NoSuchDataMapException(String dataMapName, String tableName) { - super("Datamap with name " + dataMapName + " does not exist under table " + tableName); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index b98bddf..b89d49d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.util.CarbonException import org.apache.spark.unsafe.types.UTF8String +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -34,8 +35,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMa import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.util.DataLoadingUtil object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -138,8 +137,7 @@ object CarbonStore { carbonCleanFilesLock = CarbonLockUtil .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) currentTablePartitions match { case Some(partitions) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala index ad624ee..578138f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala @@ -16,12 +16,12 @@ */ package org.apache.carbondata.spark +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - /** - * Carbon column validator - */ +/** + * Carbon column validator + */ class CarbonColumnValidator extends ColumnValidator { def validateColumns(allColumns: Seq[ColumnSchema]) { allColumns.foreach { columnSchema => http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala deleted file mode 100644 index dfda92c..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.load - -import java.text.SimpleDateFormat - -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.processing.loading.sort.SortScopeOptions -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - -object ValidateUtil { - - /** - * validates both timestamp and date for illegal values - * - * @param dateTimeLoadFormat - * @param dateTimeLoadOption - */ - def validateDateTimeFormat(dateTimeLoadFormat: String, dateTimeLoadOption: String): Unit = { - // allowing empty value to be configured for dateformat option. - if (dateTimeLoadFormat != null && dateTimeLoadFormat.trim != "") { - try { - new SimpleDateFormat(dateTimeLoadFormat) - } catch { - case _: IllegalArgumentException => - throw new MalformedCarbonCommandException(s"Error: Wrong option: $dateTimeLoadFormat is" + - s" provided for option $dateTimeLoadOption") - } - } - } - - def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = { - if (sortScope != null) { - // Don't support use global sort on partitioned table. - if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null && - sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) { - throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " + - "table.") - } - } - } - - def validateGlobalSortPartitions(globalSortPartitions: String): Unit = { - if (globalSortPartitions != null) { - try { - val num = globalSortPartitions.toInt - if (num <= 0) { - throw new MalformedCarbonCommandException("'GLOBAL_SORT_PARTITIONS' should be greater " + - "than 0.") - } - } catch { - case e: NumberFormatException => throw new MalformedCarbonCommandException(e.getMessage) - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index fa126fc..d2c059c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -310,11 +310,11 @@ class CarbonMergerRDD[K, V]( val splits = format.getSplits(job) // keep on assigning till last one is reached. - if (null != splits && splits.size > 0) splitsOfLastSegment = - splits.asScala + if (null != splits && splits.size > 0) { + splitsOfLastSegment = splits.asScala .map(_.asInstanceOf[CarbonInputSplit]) .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava - + } carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry => val blockInfo = new TableBlockInfo(entry.getPath.toString, entry.getStart, entry.getSegmentId, http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 90a4223..8cb8205 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{MetadataBuilder, StringType} import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.FileUtils +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -53,8 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD object CommonUtil { @@ -632,13 +632,6 @@ object CommonUtil { parsedPropertyValueString } - - def readLoadMetadataDetails(model: CarbonLoadModel): Unit = { - val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath - val details = SegmentStatusManager.readLoadMetadata(metadataPath) - model.setLoadMetadataDetails(new util.ArrayList[LoadMetadataDetails](details.toList.asJava)) - } - def configureCSVInputFormat(configuration: Configuration, carbonLoadModel: CarbonLoadModel): Unit = { CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar) @@ -680,48 +673,6 @@ object CommonUtil { } } - def getCsvHeaderColumns( - carbonLoadModel: CarbonLoadModel, - hadoopConf: Configuration): Array[String] = { - val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { - CarbonCommonConstants.COMMA - } else { - CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter) - } - var csvFile: String = null - var csvHeader: String = carbonLoadModel.getCsvHeader - val csvColumns = if (StringUtils.isBlank(csvHeader)) { - // read header from csv file - csvFile = carbonLoadModel.getFactFilePath.split(",")(0) - csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf) - if (StringUtils.isBlank(csvHeader)) { - throw new CarbonDataLoadingException("First line of the csv is not valid.") - } - csvHeader.toLowerCase().split(delimiter).map(_.replaceAll("\"", "").trim) - } else { - csvHeader.toLowerCase.split(CarbonCommonConstants.COMMA).map(_.trim) - } - - if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName, csvColumns, - carbonLoadModel.getCarbonDataLoadSchema)) { - if (csvFile == null) { - LOGGER.error("CSV header in DDL is not proper." - + " Column names in schema and CSV header are not the same.") - throw new CarbonDataLoadingException( - "CSV header in DDL is not proper. Column names in schema and CSV header are " - + "not the same.") - } else { - LOGGER.error( - "CSV header in input file is not proper. Column names in schema and csv header are not " - + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile)) - throw new CarbonDataLoadingException( - "CSV header in input file is not proper. Column names in schema and csv header are not " - + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile)) - } - } - csvColumns - } - def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = { /* User configures both csvheadercolumns, maxcolumns, @@ -862,8 +813,7 @@ object CommonUtil { try { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " +