[CARBONDATA-3064] Support separate audit log A new audit log is implemented as following: 1. a framework is added for carbon command to record the audit log automatically, see command/package.scala 2. Audit logs are output by Auditor.java, log4j config example is provided in Auditor.java file comment 3.old audit log is removed
This closes #2885 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a160dfb6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a160dfb6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a160dfb6 Branch: refs/heads/branch-1.5 Commit: a160dfb694f9bf023c6a2892877cb86599cc46d5 Parents: a0a0123 Author: Jacky Li <jacky.li...@qq.com> Authored: Wed Oct 31 14:49:38 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Nov 21 22:40:11 2018 +0530 ---------------------------------------------------------------------- .../carbondata/common/logging/impl/Audit.java | 49 - .../logging/ft/LoggingServiceTest_FT.java | 93 - .../status/DiskBasedDataMapStatusProvider.java | 2 - .../client/NonSecureDictionaryClient.java | 3 +- .../NonSecureDictionaryClientHandler.java | 3 +- .../IncrementalColumnDictionaryGenerator.java | 3 +- .../statusmanager/SegmentStatusManager.java | 14 +- .../carbondata/core/util/SessionParams.java | 18 +- .../carbondata/mv/datamap/MVAnalyzerRule.scala | 2 - .../examples/sql/CarbonSessionExample.java | 137 - .../examples/sql/JavaCarbonSessionExample.java | 94 + .../examples/CarbonSessionExample.scala | 44 +- .../carbondata/examplesCI/RunExamples.scala | 5 + .../client/SecureDictionaryClient.java | 3 +- .../server/SecureDictionaryServer.java | 3 +- .../org/apache/carbondata/api/CarbonStore.scala | 9 +- .../carbondata/spark/rdd/PartitionDropper.scala | 8 - .../spark/rdd/PartitionSplitter.scala | 6 - .../carbondata/spark/rdd/StreamHandoffRDD.scala | 12 +- .../command/carbonTableSchemaCommon.scala | 21 +- .../sql/test/ResourceRegisterAndCopier.scala | 4 +- ...CreateCarbonSourceTableAsSelectCommand.scala | 13 +- .../datamap/CarbonMergeBloomIndexFilesRDD.scala | 2 +- .../spark/rdd/AggregateDataMapCompactor.scala | 4 - .../spark/rdd/CarbonDataRDDFactory.scala | 71 +- .../spark/rdd/CarbonTableCompactor.scala | 12 - .../carbondata/stream/StreamJobManager.scala | 13 +- .../events/MergeBloomIndexEventListener.scala | 3 +- .../sql/events/MergeIndexEventListener.scala | 17 +- .../datamap/CarbonCreateDataMapCommand.scala | 12 +- .../datamap/CarbonDataMapRebuildCommand.scala | 3 + .../datamap/CarbonDataMapShowCommand.scala | 3 + .../datamap/CarbonDropDataMapCommand.scala | 6 +- .../CarbonAlterTableCompactionCommand.scala | 13 +- .../CarbonAlterTableFinishStreaming.scala | 3 + .../management/CarbonCleanFilesCommand.scala | 7 +- .../command/management/CarbonCliCommand.scala | 4 + .../CarbonDeleteLoadByIdCommand.scala | 5 +- .../CarbonDeleteLoadByLoadDateCommand.scala | 5 +- .../management/CarbonInsertIntoCommand.scala | 13 +- .../management/CarbonLoadDataCommand.scala | 355 +-- .../management/CarbonShowLoadsCommand.scala | 3 + .../management/RefreshCarbonTableCommand.scala | 18 +- .../CarbonProjectForDeleteCommand.scala | 9 +- .../CarbonProjectForUpdateCommand.scala | 4 + .../command/mutation/DeleteExecution.scala | 12 +- .../command/mutation/HorizontalCompaction.scala | 6 - .../spark/sql/execution/command/package.scala | 82 +- ...arbonAlterTableAddHivePartitionCommand.scala | 3 + ...rbonAlterTableDropHivePartitionCommand.scala | 3 + .../CarbonAlterTableDropPartitionCommand.scala | 8 +- .../CarbonAlterTableSplitPartitionCommand.scala | 9 +- .../CarbonShowCarbonPartitionsCommand.scala | 3 + .../CarbonAlterTableAddColumnCommand.scala | 9 +- .../CarbonAlterTableDataTypeChangeCommand.scala | 16 +- .../CarbonAlterTableDropColumnCommand.scala | 8 +- .../schema/CarbonAlterTableRenameCommand.scala | 9 +- .../schema/CarbonAlterTableSetCommand.scala | 9 +- .../schema/CarbonAlterTableUnsetCommand.scala | 11 +- .../schema/CarbonGetTableDetailCommand.scala | 2 + .../stream/CarbonCreateStreamCommand.scala | 3 + .../stream/CarbonDropStreamCommand.scala | 3 + .../stream/CarbonShowStreamsCommand.scala | 3 + .../CarbonCreateTableAsSelectCommand.scala | 17 +- .../table/CarbonCreateTableCommand.scala | 17 +- .../table/CarbonDescribeFormattedCommand.scala | 3 + .../command/table/CarbonDropTableCommand.scala | 6 +- .../command/table/CarbonExplainCommand.scala | 4 +- .../command/table/CarbonShowTablesCommand.scala | 1 + .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../execution/command/CarbonHiveCommands.scala | 7 +- .../sql/parser/CarbonSpark2SqlParser.scala | 6 - .../org/apache/spark/util/AlterTableUtil.scala | 10 +- .../carbondata/TestStreamingTableOpName.scala | 2647 ++++++++++++++++++ .../TestStreamingTableOperation.scala | 2647 ------------------ .../carbondata/processing/util/Auditor.java | 222 ++ .../carbondata/sdk/file/CarbonReaderTest.java | 2 - .../store/worker/SearchRequestHandler.java | 17 +- 78 files changed, 3451 insertions(+), 3487 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java b/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java deleted file mode 100644 index 1c822b9..0000000 --- a/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.common.logging.impl; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; - -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.log4j.Logger; - -public class Audit { - private static String hostName; - private static String username; - - static { - try { - hostName = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - hostName = "localhost"; - } - try { - username = UserGroupInformation.getCurrentUser().getShortUserName(); - } catch (IOException e) { - username = "unknown"; - } - } - - public static void log(Logger logger, String message) { - String threadid = String.valueOf(Thread.currentThread().getId()); - logger.log(AuditLevel.AUDIT, - "[" + hostName + "]" + "[" + username + "]" + "[Thread-" + threadid + "]" + message); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java b/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java deleted file mode 100644 index 867a154..0000000 --- a/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.common.logging.ft; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.log4j.Logger; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; -import org.apache.carbondata.common.logging.impl.AuditLevel; - -import junit.framework.TestCase; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.log4j.MDC; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class LoggingServiceTest_FT extends TestCase { - - private static Logger logger = - LogServiceFactory.getLogService(LoggingServiceTest_FT.class.getName()); - - @Before public void setUp() throws Exception { - MDC.put("MODULE", "Function Test"); - MDC.put("USER_NAME", "testuser"); - MDC.put("CLIENT_IP", "127.0.0.1"); - MDC.put("OPERATRION", "log"); - } - - @Test public void testIsAuditFileCreated() { - File f = new File("./unibiaudit.log"); - Assert.assertFalse(f.exists()); - } - - @Test public void testAudit() { - - String expectedAuditLine = - "[main] AUDIT [org.apache.carbondata.common.logging.ft.LoggingServiceTest_FT] 127.0.0.1 " - + "testuser Function Test log- audit message created"; - Audit.log(logger, "audit message created"); - - LogManager.shutdown(); - - try { - FileInputStream fstream = new FileInputStream("./carbondataaudit.log"); - BufferedReader br = new BufferedReader(new InputStreamReader(fstream)); - String actualAuditLine = null; - String strLine = null; - while ((strLine = br.readLine()) != null) { - actualAuditLine = strLine; - } - - System.out.println(actualAuditLine); - - if (actualAuditLine != null) { - int index = actualAuditLine.indexOf("[main]"); - actualAuditLine = actualAuditLine.substring(index); - Assert.assertEquals(expectedAuditLine, actualAuditLine); - } else { - Assert.assertTrue(false); - } - } catch (FileNotFoundException e) { - e.printStackTrace(); - Assert.assertTrue(!false); - } catch (IOException e) { - e.printStackTrace(); - Assert.assertTrue(false); - } - - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java index 07fe93b..27cb1cc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.List; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; @@ -148,7 +147,6 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi } else { String errorMsg = "Upadating datamapstatus is failed due to another process taken the lock" + " for updating it"; - Audit.log(LOG, errorMsg); LOG.error(errorMsg); throw new IOException(errorMsg + " Please try after some time."); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java index 2e58255..d5c2072 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java @@ -19,7 +19,6 @@ package org.apache.carbondata.core.dictionary.client; import java.net.InetSocketAddress; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage; import io.netty.bootstrap.Bootstrap; @@ -52,7 +51,7 @@ public class NonSecureDictionaryClient implements DictionaryClient { */ @Override public void startClient(String secretKey, String address, int port, boolean encryptSecureServer) { - Audit.log(LOGGER, "Starting client on " + address + " " + port); + LOGGER.info("Starting client on " + address + " " + port); long start = System.currentTimeMillis(); // Create an Event with 1 thread. workerGroup = new NioEventLoopGroup(1); http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java index 457441f..17e9c7c 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java @@ -21,7 +21,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage; import io.netty.buffer.ByteBuf; @@ -49,7 +48,7 @@ public class NonSecureDictionaryClientHandler extends ChannelInboundHandlerAdapt public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; channelFutureListener = new DictionaryChannelFutureListener(ctx); - Audit.log(LOGGER, "Connected client " + ctx); + LOGGER.info("Connected client " + ctx); super.channelActive(ctx); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java index bf0f094..7e4be5d 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; @@ -148,7 +147,7 @@ public class IncrementalColumnDictionaryGenerator implements BiDictionary<Intege long sortIndexWriteTime = System.currentTimeMillis() - t3; // update Meta Data updateMetaData(dictionaryWriter); - Audit.log(LOGGER, "\n columnName: " + dimension.getColName() + + LOGGER.info("\n columnName: " + dimension.getColName() + "\n columnId: " + dimension.getColumnId() + "\n new distinct values count: " + distinctValues.size() + "\n create dictionary cache: " + dictCacheTime + http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index fbb765b..32b1e78 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -31,7 +31,6 @@ import java.util.Arrays; import java.util.List; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -371,7 +370,6 @@ public class SegmentStatusManager { String errorMsg = "Delete segment by id is failed for " + tableDetails + ". Not able to acquire the table status lock due to other operation running " + "in the background."; - Audit.log(LOG, errorMsg); LOG.error(errorMsg); throw new Exception(errorMsg + " Please try after some time."); } @@ -381,7 +379,7 @@ public class SegmentStatusManager { } } else { - Audit.log(LOG, "Delete segment by Id is failed. No matching segment id found."); + LOG.error("Delete segment by Id is failed. No matching segment id found."); return loadIds; } @@ -389,7 +387,6 @@ public class SegmentStatusManager { String errorMsg = "Delete segment by id is failed for " + tableDetails + ". Not able to acquire the delete segment lock due to another delete " + "operation is running in the background."; - Audit.log(LOG, errorMsg); LOG.error(errorMsg); throw new Exception(errorMsg + " Please try after some time."); } @@ -453,7 +450,6 @@ public class SegmentStatusManager { String errorMsg = "Delete segment by date is failed for " + tableDetails + ". Not able to acquire the table status lock due to other operation running " + "in the background."; - Audit.log(LOG, errorMsg); LOG.error(errorMsg); throw new Exception(errorMsg + " Please try after some time."); @@ -463,7 +459,7 @@ public class SegmentStatusManager { } } else { - Audit.log(LOG, "Delete segment by date is failed. No matching segment found."); + LOG.error("Delete segment by date is failed. No matching segment found."); invalidLoadTimestamps.add(loadDate); return invalidLoadTimestamps; } @@ -472,7 +468,6 @@ public class SegmentStatusManager { String errorMsg = "Delete segment by date is failed for " + tableDetails + ". Not able to acquire the delete segment lock due to another delete " + "operation is running in the background."; - Audit.log(LOG, errorMsg); LOG.error(errorMsg); throw new Exception(errorMsg + " Please try after some time."); } @@ -579,7 +574,7 @@ public class SegmentStatusManager { } if (!loadFound) { - Audit.log(LOG, "Delete segment by ID is failed. No matching segment id found :" + loadId); + LOG.error("Delete segment by ID is failed. No matching segment id found :" + loadId); invalidLoadIds.add(loadId); return invalidLoadIds; } @@ -633,7 +628,7 @@ public class SegmentStatusManager { if (!loadFound) { invalidLoadTimestamps.add(loadDate); - Audit.log(LOG, "Delete segment by date is failed. No matching segment found."); + LOG.error("Delete segment by date is failed. No matching segment found."); return invalidLoadTimestamps; } return invalidLoadTimestamps; @@ -992,7 +987,6 @@ public class SegmentStatusManager { dbName + "." + tableName + ". Not able to acquire the table status lock due to other operation " + "running in the background."; - Audit.log(LOG, errorMsg); LOG.error(errorMsg); throw new IOException(errorMsg + " Please try after some time."); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 2181107..f87784e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; @@ -99,26 +98,13 @@ public class SessionParams implements Serializable, Cloneable { * @return properties value */ public SessionParams addProperty(String key, String value) throws InvalidConfigurationException { - return addProperty(key, value, true); - } - - /** - * This method will be used to add a new property - * - * @param key - * @return properties value - */ - public SessionParams addProperty(String key, String value, Boolean doAuditing) - throws InvalidConfigurationException { boolean isValidConf = validateKeyValue(key, value); if (isValidConf) { if (key.equals(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)) { value = value.toUpperCase(); } - if (doAuditing) { - Audit.log(LOGGER, - "The key " + key + " with value " + value + " added in the session param"); - } + LOGGER.info( + "The key " + key + " with value " + value + " added in the session param"); sProps.put(key, value); } return this; http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala index 5dc6b27..bb56e58 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.DataMapSchema @@ -72,7 +71,6 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { val modularPlan = catalog.mvSession.sessionState.rewritePlan(plan).withMVTable if (modularPlan.find (_.rewritten).isDefined) { val compactSQL = modularPlan.asCompactSQL - Audit.log(LOGGER, s"\n$compactSQL\n") val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed analyzed } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java deleted file mode 100644 index 37a12e4..0000000 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples.sql; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.examples.util.ExampleUtils; -import org.apache.log4j.PropertyConfigurator; -import org.apache.spark.sql.SparkSession; - -import java.io.File; -import java.io.IOException; - -public class CarbonSessionExample { - - public static void main(String[] args) { - File file = new File(CarbonSessionExample.class.getResource("/").getPath() + "../.."); - try { - String rootPath = file.getCanonicalPath(); - System.setProperty("path.target", rootPath + "/examples/spark2/target"); - PropertyConfigurator.configure(rootPath + "/examples/spark2/src/main/resources/log4j.properties"); - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); - SparkSession carbon = ExampleUtils.createCarbonSession("JavaCarbonSessionExample", 1); - carbon.sparkContext().setLogLevel("INFO"); - - carbon.sql("DROP TABLE IF EXISTS carbonsession_table"); - carbon.sql("DROP TABLE IF EXISTS stored_as_carbondata_table"); - - carbon.sql( - "CREATE TABLE carbonsession_table( " + - "shortField SHORT, " + - "intField INT, " + - "bigintField LONG, " + - "doubleField DOUBLE, " + - "stringField STRING, " + - "timestampField TIMESTAMP, " + - "decimalField DECIMAL(18,2), " + - "dateField DATE, " + - "charField CHAR(5), " + - "floatField FLOAT " + - ") " + - "STORED AS carbondata" + - "TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')" - ); - - String path = rootPath + "/examples/spark2/src/main/resources/data.csv"; - carbon.sql( - "LOAD DATA LOCAL INPATH " + "\'" + path + "\' " + - "INTO TABLE carbonsession_table " + - "OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')" - ); - - carbon.sql( - "SELECT charField, stringField, intField " + - "FROM carbonsession_table " + - "WHERE stringfield = 'spark' AND decimalField > 40" - ).show(); - - carbon.sql( - "SELECT * " + - "FROM carbonsession_table WHERE length(stringField) = 5" - ).show(); - - carbon.sql( - "SELECT * " + - "FROM carbonsession_table " + - "WHERE date_format(dateField, \'yyyy-MM-dd \') = \'2015-07-23\'" - ).show(); - - carbon.sql("SELECT count(stringField) FROM carbonsession_table").show(); - - carbon.sql( - "SELECT sum(intField), stringField " + - "FROM carbonsession_table " + - "GROUP BY stringField" - ).show(); - - carbon.sql( - "SELECT t1.*, t2.* " + - "FROM carbonsession_table t1, carbonsession_table t2 " + - "WHERE t1.stringField = t2.stringField" - ).show(); - - carbon.sql( - "WITH t1 AS ( " + - "SELECT * FROM carbonsession_table " + - "UNION ALL " + - "SELECT * FROM carbonsession_table" + - ") " + - "SELECT t1.*, t2.* " + - "FROM t1, carbonsession_table t2 " + - "WHERE t1.stringField = t2.stringField" - ).show(); - - carbon.sql( - "SELECT * " + - "FROM carbonsession_table " + - "WHERE stringField = 'spark' and floatField > 2.8" - ).show(); - - carbon.sql( - "CREATE TABLE stored_as_carbondata_table( " + - "name STRING, " + - "age INT" + - ") " + - "STORED AS carbondata" - ); - - carbon.sql("INSERT INTO stored_as_carbondata_table VALUES ('Bob',28) "); - carbon.sql("SELECT * FROM stored_as_carbondata_table").show(); - - carbon.sql("DROP TABLE IF EXISTS carbonsession_table"); - carbon.sql("DROP TABLE IF EXISTS stored_as_carbondata_table"); - - carbon.close(); - } - catch (IOException e) { - e.printStackTrace(); - System.out.println(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java new file mode 100644 index 0000000..db2c4fd --- /dev/null +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples.sql; + +import java.io.File; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.examples.util.ExampleUtils; + +import org.apache.spark.sql.CarbonSession; +import org.apache.spark.sql.SparkSession; + +public class JavaCarbonSessionExample { + + public static void main(String[] args) throws IOException { + // set timestamp and date format used in data.csv for loading + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd"); + + // create CarbonSession + + SparkSession.Builder builder = SparkSession.builder() + .master("local") + .appName("JavaCarbonSessionExample") + .config("spark.driver.host", "localhost"); + + SparkSession carbon = new CarbonSession.CarbonBuilder(builder) + .getOrCreateCarbonSession(); + + exampleBody(carbon); + carbon.close(); + } + + public static void exampleBody(SparkSession carbon) throws IOException { + carbon.sql("DROP TABLE IF EXISTS source"); + + carbon.sql( + "CREATE TABLE source( " + "shortField SHORT, " + "intField INT, " + "bigintField LONG, " + + "doubleField DOUBLE, " + "stringField STRING, " + "timestampField TIMESTAMP, " + + "decimalField DECIMAL(18,2), " + "dateField DATE, " + "charField CHAR(5), " + + "floatField FLOAT " + ") " + "STORED AS carbondata"); + + String rootPath = + new File(JavaCarbonSessionExample.class.getResource("/").getPath() + "../../../..") + .getCanonicalPath(); + String path = rootPath + "/examples/spark2/src/main/resources/data.csv"; + carbon.sql("LOAD DATA LOCAL INPATH " + "\'" + path + "\' " + "INTO TABLE source " + + "OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')"); + + carbon.sql("SELECT charField, stringField, intField " + "FROM source " + + "WHERE stringfield = 'spark' AND decimalField > 40").show(); + + carbon.sql("SELECT * " + "FROM source WHERE length(stringField) = 5").show(); + + carbon.sql("SELECT * " + "FROM source " + + "WHERE date_format(dateField, \'yyyy-MM-dd \') = \'2015-07-23\'").show(); + + carbon.sql("SELECT count(stringField) FROM source").show(); + + carbon.sql("SELECT sum(intField), stringField " + "FROM source " + "GROUP BY stringField") + .show(); + + carbon.sql("SELECT t1.*, t2.* " + "FROM source t1, source t2 " + + "WHERE t1.stringField = t2.stringField").show(); + + carbon.sql( + "WITH t1 AS ( " + "SELECT * FROM source " + "UNION ALL " + "SELECT * FROM source" + ") " + + "SELECT t1.*, t2.* " + "FROM t1, source t2 " + + "WHERE t1.stringField = t2.stringField").show(); + + carbon.sql("SELECT * " + "FROM source " + "WHERE stringField = 'spark' and floatField > 2.8") + .show(); + + carbon.sql("DROP TABLE IF EXISTS source"); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala index e63e852..b6921f2 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala @@ -49,13 +49,12 @@ object CarbonSessionExample { val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath - spark.sql("DROP TABLE IF EXISTS carbonsession_table") - spark.sql("DROP TABLE IF EXISTS stored_as_carbondata_table") + spark.sql("DROP TABLE IF EXISTS source") // Create table spark.sql( s""" - | CREATE TABLE carbonsession_table( + | CREATE TABLE source( | shortField SHORT, | intField INT, | bigintField LONG, @@ -67,8 +66,7 @@ object CarbonSessionExample { | charField CHAR(5), | floatField FLOAT | ) - | STORED BY 'carbondata' - | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField') + | STORED AS carbondata """.stripMargin) val path = s"$rootPath/examples/spark2/src/main/resources/data.csv" @@ -77,7 +75,7 @@ object CarbonSessionExample { spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbonsession_table + | INTO TABLE source | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') """.stripMargin) // scalastyle:on @@ -85,70 +83,58 @@ object CarbonSessionExample { spark.sql( s""" | SELECT charField, stringField, intField - | FROM carbonsession_table + | FROM source | WHERE stringfield = 'spark' AND decimalField > 40 """.stripMargin).show() spark.sql( s""" | SELECT * - | FROM carbonsession_table WHERE length(stringField) = 5 + | FROM source WHERE length(stringField) = 5 """.stripMargin).show() spark.sql( s""" | SELECT * - | FROM carbonsession_table WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23" + | FROM source WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23" """.stripMargin).show() - spark.sql("SELECT count(stringField) FROM carbonsession_table").show() + spark.sql("SELECT count(stringField) FROM source").show() spark.sql( s""" | SELECT sum(intField), stringField - | FROM carbonsession_table + | FROM source | GROUP BY stringField """.stripMargin).show() spark.sql( s""" | SELECT t1.*, t2.* - | FROM carbonsession_table t1, carbonsession_table t2 + | FROM source t1, source t2 | WHERE t1.stringField = t2.stringField """.stripMargin).show() spark.sql( s""" | WITH t1 AS ( - | SELECT * FROM carbonsession_table + | SELECT * FROM source | UNION ALL - | SELECT * FROM carbonsession_table + | SELECT * FROM source | ) | SELECT t1.*, t2.* - | FROM t1, carbonsession_table t2 + | FROM t1, source t2 | WHERE t1.stringField = t2.stringField """.stripMargin).show() spark.sql( s""" | SELECT * - | FROM carbonsession_table + | FROM source | WHERE stringField = 'spark' and floatField > 2.8 """.stripMargin).show() - spark.sql( - s""" - | CREATE TABLE stored_as_carbondata_table( - | name STRING, - | age INT - | ) - | STORED AS carbondata - """.stripMargin) - spark.sql("INSERT INTO stored_as_carbondata_table VALUES ('Bob',28) ") - spark.sql("SELECT * FROM stored_as_carbondata_table").show() - // Drop table - spark.sql("DROP TABLE IF EXISTS carbonsession_table") - spark.sql("DROP TABLE IF EXISTS stored_as_carbondata_table") + spark.sql("DROP TABLE IF EXISTS source") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala index 6a13dc3..25ef9af 100644 --- a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala +++ b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala @@ -24,6 +24,7 @@ import org.apache.carbondata.examples._ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.examples.sdk.CarbonReaderExample +import org.apache.carbondata.examples.sql.JavaCarbonSessionExample /** * Test suite for examples @@ -67,6 +68,10 @@ class RunExamples extends QueryTest with BeforeAndAfterAll { CarbonSessionExample.exampleBody(spark) } + test("JavaCarbonSessionExample") { + JavaCarbonSessionExample.exampleBody(spark) + } + test("CarbonSortColumnsExample") { CarbonSortColumnsExample.exampleBody(spark) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java index 3aa7fbf..088d9e2 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java @@ -19,7 +19,6 @@ package org.apache.carbondata.spark.dictionary.client; import java.nio.charset.Charset; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.dictionary.client.DictionaryClient; import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage; @@ -60,7 +59,7 @@ public class SecureDictionaryClient implements DictionaryClient { */ @Override public void startClient(String secretKey, String address, int port, boolean encryptSecureServer) { - Audit.log(LOGGER, "Starting client on " + address + " " + port); + LOGGER.info("Starting client on " + address + " " + port); long start = System.currentTimeMillis(); SecurityManager securityMgr; http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java index a029da0..9ae6d35 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.common.logging.impl.Audit; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage; import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType; @@ -145,7 +144,7 @@ public class SecureDictionaryServer extends AbstractDictionaryServer implements //iteratively listening to newports context .createServer(host, newPort, Lists.<TransportServerBootstrap>newArrayList(bootstrap)); - Audit.log(LOGGER, + LOGGER.info( "Dictionary Server started, Time spent " + (System.currentTimeMillis() - start) + " Listening on port " + newPort); this.port = newPort; http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index df173cd..45d472e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -29,7 +29,6 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.Strings import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory @@ -159,7 +158,6 @@ object CarbonStore { carbonTable: CarbonTable, forceTableClean: Boolean, currentTablePartitions: Option[Seq[PartitionSpec]] = None): Unit = { - Audit.log(LOGGER, s"The clean files request has been received for $dbName.$tableName") var carbonCleanFilesLock: ICarbonLock = null val absoluteTableIdentifier = if (forceTableClean) { AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) @@ -203,7 +201,6 @@ object CarbonStore { CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) } } - Audit.log(LOGGER, s"Clean files operation is success for $dbName.$tableName.") } /** @@ -282,7 +279,6 @@ object CarbonStore { tableName: String, carbonTable: CarbonTable): Unit = { - Audit.log(LOGGER, s"Delete segment by Id request has been received for $dbName.$tableName") validateLoadIds(loadids) val path = carbonTable.getMetadataPath @@ -291,7 +287,7 @@ object CarbonStore { val invalidLoadIds = SegmentStatusManager.updateDeletionStatus( carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala if (invalidLoadIds.isEmpty) { - Audit.log(LOGGER, s"Delete segment by Id is successfull for $dbName.$tableName.") + LOGGER.info(s"Delete segment by Id is successfull for $dbName.$tableName.") } else { sys.error(s"Delete segment by Id is failed. Invalid ID is: ${invalidLoadIds.mkString(",")}") } @@ -308,7 +304,6 @@ object CarbonStore { dbName: String, tableName: String, carbonTable: CarbonTable): Unit = { - Audit.log(LOGGER, s"Delete segment by Id request has been received for $dbName.$tableName") val time = validateTimeFormat(timestamp) val path = carbonTable.getMetadataPath @@ -321,7 +316,7 @@ object CarbonStore { path, time).asScala if (invalidLoadTimestamps.isEmpty) { - Audit.log(LOGGER, s"Delete segment by date is successful for $dbName.$tableName.") + LOGGER.info(s"Delete segment by date is successful for $dbName.$tableName.") } else { sys.error("Delete segment by date is failed. No matching segment found.") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala index 353a478..e215ab0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala @@ -22,9 +22,7 @@ import java.io.IOException import org.apache.spark.sql.execution.command.{AlterPartitionModel, DropPartitionCallableModel} import org.apache.spark.util.PartitionUtils -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory} @@ -89,8 +87,6 @@ object PartitionDropper { finalDropStatus = dropStatus.forall(_._2) } if (!finalDropStatus) { - Audit.log(logger, s"Drop Partition request failed for table " + - s"${ dbName }.${ tableName }") logger.error(s"Drop Partition request failed for table " + s"${ dbName }.${ tableName }") } @@ -105,8 +101,6 @@ object PartitionDropper { case e: IOException => throw new IOException("Exception while delete original carbon files ", e) } - Audit.log(logger, s"Drop Partition request completed for table " + - s"${ dbName }.${ tableName }") logger.info(s"Drop Partition request completed for table " + s"${ dbName }.${ tableName }") } @@ -117,8 +111,6 @@ object PartitionDropper { } else { PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier, Seq(partitionId).toList, dbName, tableName, partitionInfo) - Audit.log(logger, s"Drop Partition request completed for table " + - s"${ dbName }.${ tableName }") logger.info(s"Drop Partition request completed for table " + s"${ dbName }.${ tableName }") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala index 369ad51..f99b5e2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala @@ -22,9 +22,7 @@ import java.io.IOException import org.apache.spark.sql.execution.command.{AlterPartitionModel, SplitPartitionCallableModel} import org.apache.spark.util.PartitionUtils -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory} object PartitionSplitter { @@ -75,8 +73,6 @@ object PartitionSplitter { finalSplitStatus = splitStatus.forall(_._2) } if (!finalSplitStatus) { - Audit.log(logger, s"Add/Split Partition request failed for table " + - s"${ databaseName }.${ tableName }") logger.error(s"Add/Split Partition request failed for table " + s"${ databaseName }.${ tableName }") } @@ -90,8 +86,6 @@ object PartitionSplitter { case e: IOException => throw new IOException("Exception while delete original carbon files ", e) } - Audit.log(logger, s"Add/Split Partition request completed for table " + - s"${ databaseName }.${ tableName }") logger.info(s"Add/Split Partition request completed for table " + s"${ databaseName }.${ tableName }") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 13172c7..606aa01 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -21,24 +21,19 @@ import java.text.SimpleDateFormat import java.util import java.util.{Date, UUID} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{Job, RecordReader, TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.{Partition, SerializableWritable, TaskContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} @@ -346,8 +341,6 @@ object StreamHandoffRDD { LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") - Audit.log(LOGGER, s"Handoff is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Cannot write load metadata file as handoff failed") throw new Exception(errorMessage) } @@ -367,9 +360,6 @@ object StreamHandoffRDD { OperationListenerBus.getInstance() .fireEvent(loadTablePostStatusUpdateEvent, operationContext) if (!done) { - val errorMessage = "Handoff failed due to failure in table status updation." - Audit.log(LOGGER, "Handoff is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Handoff failed due to failure in table status updation.") throw new Exception(errorMessage) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index e051456..3bb7731 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -22,20 +22,15 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ListBuffer import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.util.CarbonException -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} @@ -45,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} +import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CompactionType @@ -281,20 +276,10 @@ class AlterTableColumnSchemaGenerator( allColumns.filter(x => !x.isInvisible).groupBy(_.getColumnName) .foreach(f => if (f._2.size > 1) { val name = f._1 - LOGGER.error(s"Duplicate column found with name: $name") - Audit.log(LOGGER, - s"Validation failed for Create/Alter Table Operation " + - s"for ${ dbName }.${ alterTableModel.tableName }. " + - s"Duplicate column found with name: $name") sys.error(s"Duplicate column found with name: $name") }) if (newCols.exists(_.getDataType.isComplexType)) { - LOGGER.error(s"Complex column cannot be added") - Audit.log(LOGGER, - s"Validation failed for Create/Alter Table Operation " + - s"for ${ dbName }.${ alterTableModel.tableName }. " + - s"Complex column cannot be added") sys.error(s"Complex column cannot be added") } @@ -782,10 +767,6 @@ class TableNewProcessor(cm: TableModel) { allColumns.groupBy(_.getColumnName).foreach { f => if (f._2.size > 1) { val name = f._1 - LOGGER.error(s"Duplicate column found with name: $name") - Audit.log(LOGGER, - s"Validation failed for Create/Alter Table Operation " + - s"Duplicate column found with name: $name") CarbonException.analysisException(s"Duplicate dimensions found with name: $name") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala index 87106e0..ff9b8c4 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala @@ -24,9 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.io.IOUtils -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.HdfsFileLock import org.apache.carbondata.core.util.CarbonUtil @@ -50,7 +48,7 @@ object ResourceRegisterAndCopier { if (!file.exists()) { sys.error(s"""Provided path $hdfsPath does not exist""") } - Audit.log(LOGGER, "Try downloading resource data") + LOGGER.info("Try downloading resource data") val lock = new HdfsFileLock(hdfsPath, "/resource.lock") var bool = false try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala index 74c0f97..ee9fb0f 100644 --- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala +++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala @@ -22,7 +22,7 @@ import java.net.URI import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, AtomicRunnableCommand, RunnableCommand} import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} @@ -41,11 +41,15 @@ case class CreateCarbonSourceTableAsSelectCommand( table: CatalogTable, mode: SaveMode, query: LogicalPlan) - extends RunnableCommand { + extends AtomicRunnableCommand { override protected def innerChildren: Seq[LogicalPlan] = Seq(query) - override def run(sparkSession: SparkSession): Seq[Row] = { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] ={ assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) @@ -53,6 +57,7 @@ case class CreateCarbonSourceTableAsSelectCommand( val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = table.identifier.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString + setAuditTable(db, table.identifier.table) if (sessionState.catalog.tableExists(tableIdentWithDB)) { assert(mode != SaveMode.Overwrite, @@ -127,4 +132,6 @@ case class CreateCarbonSourceTableAsSelectCommand( throw ex } } + + override protected def opName: String = "CREATE TABLE AS SELECT" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala index 1aa8b82..c9d5eb1 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala @@ -34,7 +34,7 @@ import org.apache.carbondata.spark.rdd.CarbonRDD * RDD to merge all bloomindex files of specified segment for bloom datamap */ class CarbonMergeBloomIndexFilesRDD( - @transient ss: SparkSession, + @transient private val ss: SparkSession, carbonTable: CarbonTable, segmentIds: Seq[String], bloomDatamapNames: Seq[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index a0bdd64..2119e4c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.execution.command.CompactionModel import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil -import org.apache.carbondata.api.CarbonStore.LOGGER -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory @@ -132,8 +130,6 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, carbonLoadModel.getTableName) LOGGER .info(s"Compaction request for datamap ${ carbonTable.getTableUniqueName } is successful") - Audit.log(LOGGER, - s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful") } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 2d0bc58..dc52517 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -28,27 +28,24 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import scala.util.control.Breaks._ -import com.univocity.parsers.common.TextParsingException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} -import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext} +import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.status.DataMapStatusManager @@ -68,16 +65,15 @@ import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} -import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException} +import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} +import org.apache.carbondata.processing.util.{Auditor, CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _} import org.apache.carbondata.spark.load._ import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} @@ -132,8 +128,6 @@ object CarbonDataRDDFactory { } } } else { - Audit.log(LOGGER, "Not able to acquire the system level compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") CarbonCompactionUtil @@ -308,9 +302,7 @@ object CarbonDataRDDFactory { hadoopConf: Configuration, dataFrame: Option[DataFrame] = None, updateModel: Option[UpdateTableModel] = None, - operationContext: OperationContext): Unit = { - Audit.log(LOGGER, s"Data load request has been received for table" + - s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + operationContext: OperationContext): LoadMetadataDetails = { // Check if any load need to be deleted before loading new data val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null @@ -428,11 +420,11 @@ object CarbonDataRDDFactory { if (updateModel.isDefined) { if (loadStatus == SegmentStatus.LOAD_FAILURE) { CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage) - return + return null } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS && updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS && carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { - return + return null } else { // in success case handle updation of the table status file. // success case. @@ -451,9 +443,7 @@ object CarbonDataRDDFactory { // this means that the update doesnt have any records to update so no need to do table // status file updation. if (resultSize == 0) { - Audit.log(LOGGER, "Data update is successful with 0 rows updation for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - return + return null } if (CarbonUpdateUtil.updateTableMetadataStatus( segmentDetails, @@ -462,20 +452,15 @@ object CarbonDataRDDFactory { true, new util.ArrayList[Segment](0), new util.ArrayList[Segment](segmentFiles), "")) { - Audit.log(LOGGER, "Data update is successful for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } else { - val errorMessage = "Data update failed due to failure in table status updation." - Audit.log(LOGGER, "Data update is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Data update failed due to failure in table status updation.") updateModel.get.executorErrors.errorMsg = errorMessage updateModel.get.executorErrors.failureCauses = FailureCauses .STATUS_FILE_UPDATION_FAILURE - return + return null } } - return + return null } val uniqueTableStatusId = operationContext.getProperty("uuid").asInstanceOf[String] if (loadStatus == SegmentStatus.LOAD_FAILURE) { @@ -488,8 +473,6 @@ object CarbonDataRDDFactory { clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") - Audit.log(LOGGER, s"Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.warn("Cannot write load metadata file as data load failed") throw new Exception(errorMessage) } else { @@ -507,8 +490,6 @@ object CarbonDataRDDFactory { clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") - Audit.log(LOGGER, s"Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") throw new Exception(status(0)._2._2.errorMsg) } // as no record loaded in new segment, new segment should be deleted @@ -542,11 +523,10 @@ object CarbonDataRDDFactory { carbonTable.getCarbonTableIdentifier, carbonLoadModel) OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) - val done = + val (done, writtenSegment) = updateTableStatus( status, carbonLoadModel, - loadStatus, newEntryLoadStatus, overwriteTable, segmentFileName, @@ -575,18 +555,17 @@ object CarbonDataRDDFactory { clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") - Audit.log(LOGGER, "Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Data load failed due to failure in table status updation.") throw new Exception("Data load failed due to failure in table status updation.") } if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) { - Audit.log(LOGGER, "Data load is partially successful for " + + LOGGER.info("Data load is partially successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } else { - Audit.log(LOGGER, "Data load is successful for " + + LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } + try { // compaction handling if (carbonTable.isHivePartitionTable) { @@ -605,6 +584,7 @@ object CarbonDataRDDFactory { operationContext) carbonLoadModel.setMergedSegmentIds(compactedSegments) } + writtenSegment } catch { case e: Exception => throw new Exception( @@ -845,8 +825,6 @@ object CarbonDataRDDFactory { s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") if (!carbonTable.isChildDataMap && CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { - Audit.log(LOGGER, s"Compaction request received for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val compactionSize = 0 val isCompactionTriggerByDDl = false val compactionModel = CompactionModel( @@ -905,8 +883,6 @@ object CarbonDataRDDFactory { throw e } } else { - Audit.log(LOGGER, "Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}") LOGGER.error("Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}") } @@ -916,15 +892,22 @@ object CarbonDataRDDFactory { /** * Update table status file after data loading + * @param status status collected from each task + * @param carbonLoadModel load model used for loading + * @param newEntryLoadStatus segment status to set in the metadata + * @param overwriteTable true the operation is overwrite + * @param segmentFileName segment file name + * @param uuid uuid for the table status file name + * @return whether operation success and + * the segment metadata that written into the segment status file */ private def updateTableStatus( status: Array[(String, (LoadMetadataDetails, ExecutionErrors))], carbonLoadModel: CarbonLoadModel, - loadStatus: SegmentStatus, newEntryLoadStatus: SegmentStatus, overwriteTable: Boolean, segmentFileName: String, - uuid: String = ""): Boolean = { + uuid: String = ""): (Boolean, LoadMetadataDetails) = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val metadataDetails = if (status != null && status.size > 0 && status(0) != null) { status(0)._2._1 @@ -948,14 +931,12 @@ object CarbonDataRDDFactory { if (!done) { val errorMessage = s"Dataload failed due to failure in table status updation for" + s" ${carbonLoadModel.getTableName}" - Audit.log(LOGGER, "Data load is failed for " + - s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") - LOGGER.error("Dataload failed due to failure in table status updation.") + LOGGER.error(errorMessage) throw new Exception(errorMessage) } else { DataMapStatusManager.disableAllLazyDataMaps(carbonTable) } - done + (done, metadataDetails) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index ac83212..16309fe 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -27,12 +27,9 @@ import scala.collection.mutable import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} -import org.apache.carbondata.api.CarbonStore.LOGGER -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.metadata.SegmentFileStore -import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, TableStatusReadCommittedScope} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ @@ -304,25 +301,16 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, // true because compaction for all datamaps will be finished at a time to the maximum level // possible (level 1, 2 etc). so we need to check for either condition if (!statusFileUpdation || !commitComplete) { - Audit.log(LOGGER, - s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + - s"${ carbonLoadModel.getTableName }") LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + s"${ carbonLoadModel.getTableName }") throw new Exception(s"Compaction failed to update metadata for table" + s" ${ carbonLoadModel.getDatabaseName }." + s"${ carbonLoadModel.getTableName }") } else { - Audit.log(LOGGER, - s"Compaction request completed for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.info(s"Compaction request completed for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } } else { - Audit.log(LOGGER, s"Compaction request failed for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" - ) LOGGER.error(s"Compaction request failed for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") throw new Exception("Compaction Failure in Merger Rdd.") http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala index 1b9fb44..ece688e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala @@ -21,19 +21,15 @@ import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.NoSuchStreamException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.spark.StreamingOption import org.apache.carbondata.streaming.CarbonStreamException @@ -160,9 +156,6 @@ object StreamJobManager { StreamJobDesc(job, streamName, sourceTable.getDatabaseName, sourceTable.getTableName, sinkTable.getDatabaseName, sinkTable.getTableName, query, thread)) - Audit.log(LOGGER, s"STREAM $streamName started with job id '${job.id.toString}', " + - s"from ${sourceTable.getDatabaseName}.${sourceTable.getTableName} " + - s"to ${sinkTable.getDatabaseName}.${sinkTable.getTableName}") job.id.toString } else { thread.interrupt() @@ -181,10 +174,6 @@ object StreamJobManager { jobDesc.streamingQuery.stop() jobDesc.thread.interrupt() jobs.remove(streamName) - Audit.log(LOGGER, - s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " + - s"from ${jobDesc.sourceDb}.${jobDesc.sourceTable} " + - s"to ${jobDesc.sinkDb}.${jobDesc.sinkTable}") } else { if (!ifExists) { throw new NoSuchStreamException(streamName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala index ea0f9e7..8fb3cc1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala @@ -24,7 +24,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -37,7 +36,7 @@ class MergeBloomIndexEventListener extends OperationEventListener with Logging { override def onEvent(event: Event, operationContext: OperationContext): Unit = { event match { case datamapPostEvent: BuildDataMapPostExecutionEvent => - Audit.log(LOGGER, "Load post status event-listener called for merge bloom index") + LOGGER.info("Load post status event-listener called for merge bloom index") val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)