[CARBONDATA-3064] Support separate audit log

A new audit log is implemented as following:
    1. a framework is added for carbon command to record the audit log 
automatically, see command/package.scala
    2. Audit logs are output by Auditor.java, log4j config example is provided 
in Auditor.java file comment
    3.old audit log is removed

This closes #2885


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a160dfb6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a160dfb6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a160dfb6

Branch: refs/heads/branch-1.5
Commit: a160dfb694f9bf023c6a2892877cb86599cc46d5
Parents: a0a0123
Author: Jacky Li <jacky.li...@qq.com>
Authored: Wed Oct 31 14:49:38 2018 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Wed Nov 21 22:40:11 2018 +0530

----------------------------------------------------------------------
 .../carbondata/common/logging/impl/Audit.java   |   49 -
 .../logging/ft/LoggingServiceTest_FT.java       |   93 -
 .../status/DiskBasedDataMapStatusProvider.java  |    2 -
 .../client/NonSecureDictionaryClient.java       |    3 +-
 .../NonSecureDictionaryClientHandler.java       |    3 +-
 .../IncrementalColumnDictionaryGenerator.java   |    3 +-
 .../statusmanager/SegmentStatusManager.java     |   14 +-
 .../carbondata/core/util/SessionParams.java     |   18 +-
 .../carbondata/mv/datamap/MVAnalyzerRule.scala  |    2 -
 .../examples/sql/CarbonSessionExample.java      |  137 -
 .../examples/sql/JavaCarbonSessionExample.java  |   94 +
 .../examples/CarbonSessionExample.scala         |   44 +-
 .../carbondata/examplesCI/RunExamples.scala     |    5 +
 .../client/SecureDictionaryClient.java          |    3 +-
 .../server/SecureDictionaryServer.java          |    3 +-
 .../org/apache/carbondata/api/CarbonStore.scala |    9 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |    8 -
 .../spark/rdd/PartitionSplitter.scala           |    6 -
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   12 +-
 .../command/carbonTableSchemaCommon.scala       |   21 +-
 .../sql/test/ResourceRegisterAndCopier.scala    |    4 +-
 ...CreateCarbonSourceTableAsSelectCommand.scala |   13 +-
 .../datamap/CarbonMergeBloomIndexFilesRDD.scala |    2 +-
 .../spark/rdd/AggregateDataMapCompactor.scala   |    4 -
 .../spark/rdd/CarbonDataRDDFactory.scala        |   71 +-
 .../spark/rdd/CarbonTableCompactor.scala        |   12 -
 .../carbondata/stream/StreamJobManager.scala    |   13 +-
 .../events/MergeBloomIndexEventListener.scala   |    3 +-
 .../sql/events/MergeIndexEventListener.scala    |   17 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   12 +-
 .../datamap/CarbonDataMapRebuildCommand.scala   |    3 +
 .../datamap/CarbonDataMapShowCommand.scala      |    3 +
 .../datamap/CarbonDropDataMapCommand.scala      |    6 +-
 .../CarbonAlterTableCompactionCommand.scala     |   13 +-
 .../CarbonAlterTableFinishStreaming.scala       |    3 +
 .../management/CarbonCleanFilesCommand.scala    |    7 +-
 .../command/management/CarbonCliCommand.scala   |    4 +
 .../CarbonDeleteLoadByIdCommand.scala           |    5 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |    5 +-
 .../management/CarbonInsertIntoCommand.scala    |   13 +-
 .../management/CarbonLoadDataCommand.scala      |  355 +--
 .../management/CarbonShowLoadsCommand.scala     |    3 +
 .../management/RefreshCarbonTableCommand.scala  |   18 +-
 .../CarbonProjectForDeleteCommand.scala         |    9 +-
 .../CarbonProjectForUpdateCommand.scala         |    4 +
 .../command/mutation/DeleteExecution.scala      |   12 +-
 .../command/mutation/HorizontalCompaction.scala |    6 -
 .../spark/sql/execution/command/package.scala   |   82 +-
 ...arbonAlterTableAddHivePartitionCommand.scala |    3 +
 ...rbonAlterTableDropHivePartitionCommand.scala |    3 +
 .../CarbonAlterTableDropPartitionCommand.scala  |    8 +-
 .../CarbonAlterTableSplitPartitionCommand.scala |    9 +-
 .../CarbonShowCarbonPartitionsCommand.scala     |    3 +
 .../CarbonAlterTableAddColumnCommand.scala      |    9 +-
 .../CarbonAlterTableDataTypeChangeCommand.scala |   16 +-
 .../CarbonAlterTableDropColumnCommand.scala     |    8 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |    9 +-
 .../schema/CarbonAlterTableSetCommand.scala     |    9 +-
 .../schema/CarbonAlterTableUnsetCommand.scala   |   11 +-
 .../schema/CarbonGetTableDetailCommand.scala    |    2 +
 .../stream/CarbonCreateStreamCommand.scala      |    3 +
 .../stream/CarbonDropStreamCommand.scala        |    3 +
 .../stream/CarbonShowStreamsCommand.scala       |    3 +
 .../CarbonCreateTableAsSelectCommand.scala      |   17 +-
 .../table/CarbonCreateTableCommand.scala        |   17 +-
 .../table/CarbonDescribeFormattedCommand.scala  |    3 +
 .../command/table/CarbonDropTableCommand.scala  |    6 +-
 .../command/table/CarbonExplainCommand.scala    |    4 +-
 .../command/table/CarbonShowTablesCommand.scala |    1 +
 .../strategy/CarbonLateDecodeStrategy.scala     |    2 +-
 .../execution/command/CarbonHiveCommands.scala  |    7 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |    6 -
 .../org/apache/spark/util/AlterTableUtil.scala  |   10 +-
 .../carbondata/TestStreamingTableOpName.scala   | 2647 ++++++++++++++++++
 .../TestStreamingTableOperation.scala           | 2647 ------------------
 .../carbondata/processing/util/Auditor.java     |  222 ++
 .../carbondata/sdk/file/CarbonReaderTest.java   |    2 -
 .../store/worker/SearchRequestHandler.java      |   17 +-
 78 files changed, 3451 insertions(+), 3487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java 
b/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java
deleted file mode 100644
index 1c822b9..0000000
--- a/common/src/main/java/org/apache/carbondata/common/logging/impl/Audit.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.common.logging.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-
-public class Audit {
-  private static String hostName;
-  private static String username;
-
-  static {
-    try {
-      hostName = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException e) {
-      hostName = "localhost";
-    }
-    try {
-      username = UserGroupInformation.getCurrentUser().getShortUserName();
-    } catch (IOException e) {
-      username = "unknown";
-    }
-  }
-
-  public static void log(Logger logger, String message) {
-    String threadid = String.valueOf(Thread.currentThread().getId());
-    logger.log(AuditLevel.AUDIT,
-        "[" + hostName + "]" + "[" + username + "]" + "[Thread-" + threadid + 
"]" + message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java
 
b/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java
deleted file mode 100644
index 867a154..0000000
--- 
a/common/src/test/java/org/apache/carbondata/common/logging/ft/LoggingServiceTest_FT.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.common.logging.ft;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.log4j.Logger;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
-import org.apache.carbondata.common.logging.impl.AuditLevel;
-
-import junit.framework.TestCase;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.log4j.MDC;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class LoggingServiceTest_FT extends TestCase {
-
-  private static Logger logger =
-      LogServiceFactory.getLogService(LoggingServiceTest_FT.class.getName());
-
-  @Before public void setUp() throws Exception {
-    MDC.put("MODULE", "Function Test");
-    MDC.put("USER_NAME", "testuser");
-    MDC.put("CLIENT_IP", "127.0.0.1");
-    MDC.put("OPERATRION", "log");
-  }
-
-  @Test public void testIsAuditFileCreated() {
-    File f = new File("./unibiaudit.log");
-    Assert.assertFalse(f.exists());
-  }
-
-  @Test public void testAudit() {
-
-    String expectedAuditLine =
-        "[main] AUDIT 
[org.apache.carbondata.common.logging.ft.LoggingServiceTest_FT] 127.0.0.1 "
-            + "testuser Function Test log- audit message created";
-    Audit.log(logger, "audit message created");
-
-    LogManager.shutdown();
-
-    try {
-      FileInputStream fstream = new FileInputStream("./carbondataaudit.log");
-      BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
-      String actualAuditLine = null;
-      String strLine = null;
-      while ((strLine = br.readLine()) != null) {
-        actualAuditLine = strLine;
-      }
-
-      System.out.println(actualAuditLine);
-
-      if (actualAuditLine != null) {
-        int index = actualAuditLine.indexOf("[main]");
-        actualAuditLine = actualAuditLine.substring(index);
-        Assert.assertEquals(expectedAuditLine, actualAuditLine);
-      } else {
-        Assert.assertTrue(false);
-      }
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-      Assert.assertTrue(!false);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.assertTrue(false);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
index 07fe93b..27cb1cc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
@@ -30,7 +30,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
@@ -148,7 +147,6 @@ public class DiskBasedDataMapStatusProvider implements 
DataMapStatusStorageProvi
       } else {
         String errorMsg = "Upadating datamapstatus is failed due to another 
process taken the lock"
             + " for updating it";
-        Audit.log(LOG, errorMsg);
         LOG.error(errorMsg);
         throw new IOException(errorMsg + " Please try after some time.");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
 
b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
index 2e58255..d5c2072 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClient.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.dictionary.client;
 import java.net.InetSocketAddress;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 
 import io.netty.bootstrap.Bootstrap;
@@ -52,7 +51,7 @@ public class NonSecureDictionaryClient implements 
DictionaryClient {
    */
   @Override public void startClient(String secretKey, String address, int port,
       boolean encryptSecureServer) {
-    Audit.log(LOGGER, "Starting client on " + address + " " + port);
+    LOGGER.info("Starting client on " + address + " " + port);
     long start = System.currentTimeMillis();
     // Create an Event with 1 thread.
     workerGroup = new NioEventLoopGroup(1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
 
b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
index 457441f..17e9c7c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/dictionary/client/NonSecureDictionaryClientHandler.java
@@ -21,7 +21,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 
 import io.netty.buffer.ByteBuf;
@@ -49,7 +48,7 @@ public class NonSecureDictionaryClientHandler extends 
ChannelInboundHandlerAdapt
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
     this.ctx = ctx;
     channelFutureListener = new DictionaryChannelFutureListener(ctx);
-    Audit.log(LOGGER, "Connected client " + ctx);
+    LOGGER.info("Connected client " + ctx);
     super.channelActive(ctx);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
 
b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
index bf0f094..7e4be5d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
@@ -148,7 +147,7 @@ public class IncrementalColumnDictionaryGenerator 
implements BiDictionary<Intege
     long sortIndexWriteTime = System.currentTimeMillis() - t3;
     // update Meta Data
     updateMetaData(dictionaryWriter);
-    Audit.log(LOGGER, "\n columnName: " + dimension.getColName() +
+    LOGGER.info("\n columnName: " + dimension.getColName() +
             "\n columnId: " + dimension.getColumnId() +
             "\n new distinct values count: " + distinctValues.size() +
             "\n create dictionary cache: " + dictCacheTime +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index fbb765b..32b1e78 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -371,7 +370,6 @@ public class SegmentStatusManager {
               String errorMsg = "Delete segment by id is failed for " + 
tableDetails
                   + ". Not able to acquire the table status lock due to other 
operation running "
                   + "in the background.";
-              Audit.log(LOG, errorMsg);
               LOG.error(errorMsg);
               throw new Exception(errorMsg + " Please try after some time.");
             }
@@ -381,7 +379,7 @@ public class SegmentStatusManager {
           }
 
         } else {
-          Audit.log(LOG, "Delete segment by Id is failed. No matching segment 
id found.");
+          LOG.error("Delete segment by Id is failed. No matching segment id 
found.");
           return loadIds;
         }
 
@@ -389,7 +387,6 @@ public class SegmentStatusManager {
         String errorMsg = "Delete segment by id is failed for " + tableDetails
             + ". Not able to acquire the delete segment lock due to another 
delete "
             + "operation is running in the background.";
-        Audit.log(LOG, errorMsg);
         LOG.error(errorMsg);
         throw new Exception(errorMsg + " Please try after some time.");
       }
@@ -453,7 +450,6 @@ public class SegmentStatusManager {
               String errorMsg = "Delete segment by date is failed for " + 
tableDetails
                   + ". Not able to acquire the table status lock due to other 
operation running "
                   + "in the background.";
-              Audit.log(LOG, errorMsg);
               LOG.error(errorMsg);
               throw new Exception(errorMsg + " Please try after some time.");
 
@@ -463,7 +459,7 @@ public class SegmentStatusManager {
           }
 
         } else {
-          Audit.log(LOG, "Delete segment by date is failed. No matching 
segment found.");
+          LOG.error("Delete segment by date is failed. No matching segment 
found.");
           invalidLoadTimestamps.add(loadDate);
           return invalidLoadTimestamps;
         }
@@ -472,7 +468,6 @@ public class SegmentStatusManager {
         String errorMsg = "Delete segment by date is failed for " + 
tableDetails
             + ". Not able to acquire the delete segment lock due to another 
delete "
             + "operation is running in the background.";
-        Audit.log(LOG, errorMsg);
         LOG.error(errorMsg);
         throw new Exception(errorMsg + " Please try after some time.");
       }
@@ -579,7 +574,7 @@ public class SegmentStatusManager {
       }
 
       if (!loadFound) {
-        Audit.log(LOG, "Delete segment by ID is failed. No matching segment id 
found :" + loadId);
+        LOG.error("Delete segment by ID is failed. No matching segment id 
found :" + loadId);
         invalidLoadIds.add(loadId);
         return invalidLoadIds;
       }
@@ -633,7 +628,7 @@ public class SegmentStatusManager {
 
     if (!loadFound) {
       invalidLoadTimestamps.add(loadDate);
-      Audit.log(LOG, "Delete segment by date is failed. No matching segment 
found.");
+      LOG.error("Delete segment by date is failed. No matching segment 
found.");
       return invalidLoadTimestamps;
     }
     return invalidLoadTimestamps;
@@ -992,7 +987,6 @@ public class SegmentStatusManager {
                 dbName + "." + tableName +
                 ". Not able to acquire the table status lock due to other 
operation " +
                 "running in the background.";
-            Audit.log(LOG, errorMsg);
             LOG.error(errorMsg);
             throw new IOException(errorMsg + " Please try after some time.");
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 2181107..f87784e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
@@ -99,26 +98,13 @@ public class SessionParams implements Serializable, 
Cloneable {
    * @return properties value
    */
   public SessionParams addProperty(String key, String value) throws 
InvalidConfigurationException {
-    return addProperty(key, value, true);
-  }
-
-  /**
-   * This method will be used to add a new property
-   *
-   * @param key
-   * @return properties value
-   */
-  public SessionParams addProperty(String key, String value, Boolean 
doAuditing)
-      throws InvalidConfigurationException {
     boolean isValidConf = validateKeyValue(key, value);
     if (isValidConf) {
       if 
(key.equals(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)) {
         value = value.toUpperCase();
       }
-      if (doAuditing) {
-        Audit.log(LOGGER,
-            "The key " + key + " with value " + value + " added in the session 
param");
-      }
+      LOGGER.info(
+          "The key " + key + " with value " + value + " added in the session 
param");
       sProps.put(key, value);
     }
     return this;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index 5dc6b27..bb56e58 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
@@ -72,7 +71,6 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
       val modularPlan = 
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
       if (modularPlan.find (_.rewritten).isDefined) {
         val compactSQL = modularPlan.asCompactSQL
-        Audit.log(LOGGER, s"\n$compactSQL\n")
         val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
         analyzed
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java
 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java
deleted file mode 100644
index 37a12e4..0000000
--- 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/CarbonSessionExample.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples.sql;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.examples.util.ExampleUtils;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.spark.sql.SparkSession;
-
-import java.io.File;
-import java.io.IOException;
-
-public class CarbonSessionExample {
-
-  public static void main(String[] args) {
-    File file = new File(CarbonSessionExample.class.getResource("/").getPath() 
+ "../..");
-    try {
-      String rootPath = file.getCanonicalPath();
-      System.setProperty("path.target", rootPath + "/examples/spark2/target");
-      PropertyConfigurator.configure(rootPath + 
"/examples/spark2/src/main/resources/log4j.properties");
-      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
 "true");
-      SparkSession carbon = 
ExampleUtils.createCarbonSession("JavaCarbonSessionExample", 1);
-      carbon.sparkContext().setLogLevel("INFO");
-
-      carbon.sql("DROP TABLE IF EXISTS carbonsession_table");
-      carbon.sql("DROP TABLE IF EXISTS stored_as_carbondata_table");
-
-      carbon.sql(
-              "CREATE TABLE carbonsession_table( " +
-                      "shortField SHORT, " +
-                      "intField INT, " +
-                      "bigintField LONG, " +
-                      "doubleField DOUBLE, " +
-                      "stringField STRING, " +
-                      "timestampField TIMESTAMP, " +
-                      "decimalField DECIMAL(18,2), " +
-                      "dateField DATE, " +
-                      "charField CHAR(5), " +
-                      "floatField FLOAT " +
-                      ") " +
-                      "STORED AS carbondata" +
-                      "TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, 
charField')"
-      );
-
-      String path = rootPath + "/examples/spark2/src/main/resources/data.csv";
-      carbon.sql(
-              "LOAD DATA LOCAL INPATH " + "\'" + path + "\' " +
-                      "INTO TABLE carbonsession_table " +
-                      "OPTIONS('HEADER'='true', 
'COMPLEX_DELIMITER_LEVEL_1'='#')"
-      );
-
-      carbon.sql(
-              "SELECT charField, stringField, intField " +
-                      "FROM carbonsession_table " +
-                      "WHERE stringfield = 'spark' AND decimalField > 40"
-      ).show();
-
-      carbon.sql(
-              "SELECT * " +
-                      "FROM carbonsession_table WHERE length(stringField) = 5"
-      ).show();
-
-      carbon.sql(
-              "SELECT * " +
-                      "FROM carbonsession_table " +
-                      "WHERE date_format(dateField, \'yyyy-MM-dd \') = 
\'2015-07-23\'"
-      ).show();
-
-      carbon.sql("SELECT count(stringField) FROM carbonsession_table").show();
-
-      carbon.sql(
-              "SELECT sum(intField), stringField " +
-                      "FROM carbonsession_table " +
-                      "GROUP BY stringField"
-      ).show();
-
-      carbon.sql(
-              "SELECT t1.*, t2.* " +
-                      "FROM carbonsession_table t1, carbonsession_table t2 " +
-                      "WHERE t1.stringField = t2.stringField"
-      ).show();
-
-      carbon.sql(
-              "WITH t1 AS ( " +
-                      "SELECT * FROM carbonsession_table " +
-                      "UNION ALL " +
-                      "SELECT * FROM carbonsession_table" +
-                      ") " +
-                      "SELECT t1.*, t2.* " +
-                      "FROM t1, carbonsession_table t2 " +
-                      "WHERE t1.stringField = t2.stringField"
-      ).show();
-
-      carbon.sql(
-              "SELECT * " +
-                      "FROM carbonsession_table " +
-                      "WHERE stringField = 'spark' and floatField > 2.8"
-      ).show();
-
-      carbon.sql(
-              "CREATE TABLE stored_as_carbondata_table( " +
-                      "name STRING, " +
-                      "age INT" +
-                      ") " +
-                      "STORED AS carbondata"
-      );
-
-      carbon.sql("INSERT INTO stored_as_carbondata_table VALUES ('Bob',28) ");
-      carbon.sql("SELECT * FROM stored_as_carbondata_table").show();
-
-      carbon.sql("DROP TABLE IF EXISTS carbonsession_table");
-      carbon.sql("DROP TABLE IF EXISTS stored_as_carbondata_table");
-
-      carbon.close();
-    }
-    catch (IOException e) {
-      e.printStackTrace();
-      System.out.println(e.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java
 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java
new file mode 100644
index 0000000..db2c4fd
--- /dev/null
+++ 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sql/JavaCarbonSessionExample.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.sql;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.examples.util.ExampleUtils;
+
+import org.apache.spark.sql.CarbonSession;
+import org.apache.spark.sql.SparkSession;
+
+public class JavaCarbonSessionExample {
+
+  public static void main(String[] args) throws IOException {
+    // set timestamp and date format used in data.csv for loading
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd HH:mm:ss")
+        .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd");
+
+    // create CarbonSession
+
+    SparkSession.Builder builder = SparkSession.builder()
+        .master("local")
+        .appName("JavaCarbonSessionExample")
+        .config("spark.driver.host", "localhost");
+
+    SparkSession carbon = new CarbonSession.CarbonBuilder(builder)
+        .getOrCreateCarbonSession();
+
+    exampleBody(carbon);
+    carbon.close();
+  }
+
+  public static void exampleBody(SparkSession carbon) throws IOException {
+    carbon.sql("DROP TABLE IF EXISTS source");
+
+    carbon.sql(
+        "CREATE TABLE source( " + "shortField SHORT, " + "intField INT, " + 
"bigintField LONG, "
+            + "doubleField DOUBLE, " + "stringField STRING, " + 
"timestampField TIMESTAMP, "
+            + "decimalField DECIMAL(18,2), " + "dateField DATE, " + "charField 
CHAR(5), "
+            + "floatField FLOAT " + ") " + "STORED AS carbondata");
+
+    String rootPath =
+        new File(JavaCarbonSessionExample.class.getResource("/").getPath() + 
"../../../..")
+            .getCanonicalPath();
+    String path = rootPath + "/examples/spark2/src/main/resources/data.csv";
+    carbon.sql("LOAD DATA LOCAL INPATH " + "\'" + path + "\' " + "INTO TABLE 
source "
+        + "OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')");
+
+    carbon.sql("SELECT charField, stringField, intField " + "FROM source "
+        + "WHERE stringfield = 'spark' AND decimalField > 40").show();
+
+    carbon.sql("SELECT * " + "FROM source WHERE length(stringField) = 
5").show();
+
+    carbon.sql("SELECT * " + "FROM source "
+        + "WHERE date_format(dateField, \'yyyy-MM-dd \') = 
\'2015-07-23\'").show();
+
+    carbon.sql("SELECT count(stringField) FROM source").show();
+
+    carbon.sql("SELECT sum(intField), stringField " + "FROM source " + "GROUP 
BY stringField")
+        .show();
+
+    carbon.sql("SELECT t1.*, t2.* " + "FROM source t1, source t2 "
+        + "WHERE t1.stringField = t2.stringField").show();
+
+    carbon.sql(
+        "WITH t1 AS ( " + "SELECT * FROM source " + "UNION ALL " + "SELECT * 
FROM source" + ") "
+            + "SELECT t1.*, t2.* " + "FROM t1, source t2 "
+            + "WHERE t1.stringField = t2.stringField").show();
+
+    carbon.sql("SELECT * " + "FROM source " + "WHERE stringField = 'spark' and 
floatField > 2.8")
+        .show();
+
+    carbon.sql("DROP TABLE IF EXISTS source");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index e63e852..b6921f2 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -49,13 +49,12 @@ object CarbonSessionExample {
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
 
-    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
-    spark.sql("DROP TABLE IF EXISTS stored_as_carbondata_table")
+    spark.sql("DROP TABLE IF EXISTS source")
 
     // Create table
     spark.sql(
       s"""
-         | CREATE TABLE carbonsession_table(
+         | CREATE TABLE source(
          | shortField SHORT,
          | intField INT,
          | bigintField LONG,
@@ -67,8 +66,7 @@ object CarbonSessionExample {
          | charField CHAR(5),
          | floatField FLOAT
          | )
-         | STORED BY 'carbondata'
-         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+         | STORED AS carbondata
        """.stripMargin)
 
     val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
@@ -77,7 +75,7 @@ object CarbonSessionExample {
     spark.sql(
       s"""
          | LOAD DATA LOCAL INPATH '$path'
-         | INTO TABLE carbonsession_table
+         | INTO TABLE source
          | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
        """.stripMargin)
     // scalastyle:on
@@ -85,70 +83,58 @@ object CarbonSessionExample {
     spark.sql(
       s"""
          | SELECT charField, stringField, intField
-         | FROM carbonsession_table
+         | FROM source
          | WHERE stringfield = 'spark' AND decimalField > 40
       """.stripMargin).show()
 
     spark.sql(
       s"""
          | SELECT *
-         | FROM carbonsession_table WHERE length(stringField) = 5
+         | FROM source WHERE length(stringField) = 5
        """.stripMargin).show()
 
     spark.sql(
       s"""
          | SELECT *
-         | FROM carbonsession_table WHERE date_format(dateField, "yyyy-MM-dd") 
= "2015-07-23"
+         | FROM source WHERE date_format(dateField, "yyyy-MM-dd") = 
"2015-07-23"
        """.stripMargin).show()
 
-    spark.sql("SELECT count(stringField) FROM carbonsession_table").show()
+    spark.sql("SELECT count(stringField) FROM source").show()
 
     spark.sql(
       s"""
          | SELECT sum(intField), stringField
-         | FROM carbonsession_table
+         | FROM source
          | GROUP BY stringField
        """.stripMargin).show()
 
     spark.sql(
       s"""
          | SELECT t1.*, t2.*
-         | FROM carbonsession_table t1, carbonsession_table t2
+         | FROM source t1, source t2
          | WHERE t1.stringField = t2.stringField
       """.stripMargin).show()
 
     spark.sql(
       s"""
          | WITH t1 AS (
-         | SELECT * FROM carbonsession_table
+         | SELECT * FROM source
          | UNION ALL
-         | SELECT * FROM carbonsession_table
+         | SELECT * FROM source
          | )
          | SELECT t1.*, t2.*
-         | FROM t1, carbonsession_table t2
+         | FROM t1, source t2
          | WHERE t1.stringField = t2.stringField
       """.stripMargin).show()
 
     spark.sql(
       s"""
          | SELECT *
-         | FROM carbonsession_table
+         | FROM source
          | WHERE stringField = 'spark' and floatField > 2.8
        """.stripMargin).show()
 
-    spark.sql(
-      s"""
-         | CREATE TABLE stored_as_carbondata_table(
-         |    name STRING,
-         |    age INT
-         |    )
-         | STORED AS carbondata
-       """.stripMargin)
-    spark.sql("INSERT INTO stored_as_carbondata_table VALUES ('Bob',28) ")
-    spark.sql("SELECT * FROM stored_as_carbondata_table").show()
-
     // Drop table
-    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
-    spark.sql("DROP TABLE IF EXISTS stored_as_carbondata_table")
+    spark.sql("DROP TABLE IF EXISTS source")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
 
b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
index 6a13dc3..25ef9af 100644
--- 
a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
+++ 
b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
@@ -24,6 +24,7 @@ import org.apache.carbondata.examples._
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.sdk.CarbonReaderExample
+import org.apache.carbondata.examples.sql.JavaCarbonSessionExample
 
 /**
  * Test suite for examples
@@ -67,6 +68,10 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
     CarbonSessionExample.exampleBody(spark)
   }
 
+  test("JavaCarbonSessionExample") {
+    JavaCarbonSessionExample.exampleBody(spark)
+  }
+
   test("CarbonSortColumnsExample") {
     CarbonSortColumnsExample.exampleBody(spark)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
index 3aa7fbf..088d9e2 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/client/SecureDictionaryClient.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.spark.dictionary.client;
 import java.nio.charset.Charset;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
@@ -60,7 +59,7 @@ public class SecureDictionaryClient implements 
DictionaryClient {
    */
   @Override public void startClient(String secretKey, String address, int port,
       boolean encryptSecureServer) {
-    Audit.log(LOGGER, "Starting client on " + address + " " + port);
+    LOGGER.info("Starting client on " + address + " " + port);
     long start = System.currentTimeMillis();
 
     SecurityManager securityMgr;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
index a029da0..9ae6d35 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import 
org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
@@ -145,7 +144,7 @@ public class SecureDictionaryServer extends 
AbstractDictionaryServer implements
         //iteratively listening to newports
         context
             .createServer(host, newPort, 
Lists.<TransportServerBootstrap>newArrayList(bootstrap));
-        Audit.log(LOGGER,
+        LOGGER.info(
             "Dictionary Server started, Time spent " + 
(System.currentTimeMillis() - start)
             + " Listening on port " + newPort);
         this.port = newPort;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index df173cd..45d472e 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -29,7 +29,6 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.carbondata.common.Strings
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -159,7 +158,6 @@ object CarbonStore {
       carbonTable: CarbonTable,
       forceTableClean: Boolean,
       currentTablePartitions: Option[Seq[PartitionSpec]] = None): Unit = {
-    Audit.log(LOGGER, s"The clean files request has been received for 
$dbName.$tableName")
     var carbonCleanFilesLock: ICarbonLock = null
     val absoluteTableIdentifier = if (forceTableClean) {
       AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
@@ -203,7 +201,6 @@ object CarbonStore {
         CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
       }
     }
-    Audit.log(LOGGER, s"Clean files operation is success for 
$dbName.$tableName.")
   }
 
   /**
@@ -282,7 +279,6 @@ object CarbonStore {
       tableName: String,
       carbonTable: CarbonTable): Unit = {
 
-    Audit.log(LOGGER, s"Delete segment by Id request has been received for 
$dbName.$tableName")
     validateLoadIds(loadids)
 
     val path = carbonTable.getMetadataPath
@@ -291,7 +287,7 @@ object CarbonStore {
       val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
         carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
       if (invalidLoadIds.isEmpty) {
-        Audit.log(LOGGER, s"Delete segment by Id is successfull for 
$dbName.$tableName.")
+        LOGGER.info(s"Delete segment by Id is successfull for 
$dbName.$tableName.")
       } else {
         sys.error(s"Delete segment by Id is failed. Invalid ID is: 
${invalidLoadIds.mkString(",")}")
       }
@@ -308,7 +304,6 @@ object CarbonStore {
       dbName: String,
       tableName: String,
       carbonTable: CarbonTable): Unit = {
-    Audit.log(LOGGER, s"Delete segment by Id request has been received for 
$dbName.$tableName")
 
     val time = validateTimeFormat(timestamp)
     val path = carbonTable.getMetadataPath
@@ -321,7 +316,7 @@ object CarbonStore {
           path,
           time).asScala
       if (invalidLoadTimestamps.isEmpty) {
-        Audit.log(LOGGER, s"Delete segment by date is successful for 
$dbName.$tableName.")
+        LOGGER.info(s"Delete segment by date is successful for 
$dbName.$tableName.")
       } else {
         sys.error("Delete segment by date is failed. No matching segment 
found.")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index 353a478..e215ab0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -22,9 +22,7 @@ import java.io.IOException
 import org.apache.spark.sql.execution.command.{AlterPartitionModel, 
DropPartitionCallableModel}
 import org.apache.spark.util.PartitionUtils
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory}
 
@@ -89,8 +87,6 @@ object PartitionDropper {
             finalDropStatus = dropStatus.forall(_._2)
           }
           if (!finalDropStatus) {
-            Audit.log(logger, s"Drop Partition request failed for table " +
-                         s"${ dbName }.${ tableName }")
             logger.error(s"Drop Partition request failed for table " +
                          s"${ dbName }.${ tableName }")
           }
@@ -105,8 +101,6 @@ object PartitionDropper {
             case e: IOException =>
               throw new IOException("Exception while delete original carbon 
files ", e)
           }
-          Audit.log(logger, s"Drop Partition request completed for table " +
-                       s"${ dbName }.${ tableName }")
           logger.info(s"Drop Partition request completed for table " +
                       s"${ dbName }.${ tableName }")
         }
@@ -117,8 +111,6 @@ object PartitionDropper {
     } else {
       PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, 
absoluteTableIdentifier,
         Seq(partitionId).toList, dbName, tableName, partitionInfo)
-      Audit.log(logger, s"Drop Partition request completed for table " +
-                   s"${ dbName }.${ tableName }")
       logger.info(s"Drop Partition request completed for table " +
                   s"${ dbName }.${ tableName }")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index 369ad51..f99b5e2 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -22,9 +22,7 @@ import java.io.IOException
 import org.apache.spark.sql.execution.command.{AlterPartitionModel, 
SplitPartitionCallableModel}
 import org.apache.spark.util.PartitionUtils
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory}
 
 object PartitionSplitter {
@@ -75,8 +73,6 @@ object PartitionSplitter {
          finalSplitStatus = splitStatus.forall(_._2)
        }
        if (!finalSplitStatus) {
-         Audit.log(logger, s"Add/Split Partition request failed for table " +
-                      s"${ databaseName }.${ tableName }")
          logger.error(s"Add/Split Partition request failed for table " +
                       s"${ databaseName }.${ tableName }")
        }
@@ -90,8 +86,6 @@ object PartitionSplitter {
          case e: IOException =>
            throw new IOException("Exception while delete original carbon files 
", e)
        }
-       Audit.log(logger, s"Add/Split Partition request completed for table " +
-                    s"${ databaseName }.${ tableName }")
        logger.info(s"Add/Split Partition request completed for table " +
                    s"${ databaseName }.${ tableName }")
      }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 13172c7..606aa01 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -21,24 +21,19 @@ import java.text.SimpleDateFormat
 import java.util
 import java.util.{Date, UUID}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{Job, RecordReader, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
@@ -346,8 +341,6 @@ object StreamHandoffRDD {
       LOGGER.info("********starting clean up**********")
       CarbonLoaderUtil.deleteSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)
       LOGGER.info("********clean up done**********")
-      Audit.log(LOGGER, s"Handoff is failed for " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       LOGGER.error("Cannot write load metadata file as handoff failed")
       throw new Exception(errorMessage)
     }
@@ -367,9 +360,6 @@ object StreamHandoffRDD {
       OperationListenerBus.getInstance()
         .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       if (!done) {
-        val errorMessage = "Handoff failed due to failure in table status 
updation."
-        Audit.log(LOGGER, "Handoff is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         LOGGER.error("Handoff failed due to failure in table status updation.")
         throw new Exception(errorMessage)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e051456..3bb7731 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -22,20 +22,15 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.util.CarbonException
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
DecimalType}
@@ -45,7 +40,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, 
ParentColumnTableRelation}
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType
@@ -281,20 +276,10 @@ class AlterTableColumnSchemaGenerator(
     allColumns.filter(x => !x.isInvisible).groupBy(_.getColumnName)
       .foreach(f => if (f._2.size > 1) {
         val name = f._1
-        LOGGER.error(s"Duplicate column found with name: $name")
-        Audit.log(LOGGER,
-          s"Validation failed for Create/Alter Table Operation " +
-          s"for ${ dbName }.${ alterTableModel.tableName }. " +
-          s"Duplicate column found with name: $name")
         sys.error(s"Duplicate column found with name: $name")
       })
 
     if (newCols.exists(_.getDataType.isComplexType)) {
-      LOGGER.error(s"Complex column cannot be added")
-      Audit.log(LOGGER,
-        s"Validation failed for Create/Alter Table Operation " +
-        s"for ${ dbName }.${ alterTableModel.tableName }. " +
-        s"Complex column cannot be added")
       sys.error(s"Complex column cannot be added")
     }
 
@@ -782,10 +767,6 @@ class TableNewProcessor(cm: TableModel) {
     allColumns.groupBy(_.getColumnName).foreach { f =>
       if (f._2.size > 1) {
         val name = f._1
-        LOGGER.error(s"Duplicate column found with name: $name")
-        Audit.log(LOGGER,
-          s"Validation failed for Create/Alter Table Operation " +
-          s"Duplicate column found with name: $name")
         CarbonException.analysisException(s"Duplicate dimensions found with 
name: $name")
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index 87106e0..ff9b8c4 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -24,9 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.io.IOUtils
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.HdfsFileLock
 import org.apache.carbondata.core.util.CarbonUtil
@@ -50,7 +48,7 @@ object ResourceRegisterAndCopier {
     if (!file.exists()) {
       sys.error(s"""Provided path $hdfsPath does not exist""")
     }
-    Audit.log(LOGGER, "Try downloading resource data")
+    LOGGER.info("Try downloading resource data")
     val lock = new HdfsFileLock(hdfsPath, "/resource.lock")
     var bool = false
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
index 74c0f97..ee9fb0f 100644
--- 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -22,7 +22,7 @@ import java.net.URI
 
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
RunnableCommand}
+import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
AtomicRunnableCommand, RunnableCommand}
 import org.apache.spark.sql.execution.datasources.{DataSource, 
HadoopFsRelation}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, 
SparkSession}
@@ -41,11 +41,15 @@ case class CreateCarbonSourceTableAsSelectCommand(
     table: CatalogTable,
     mode: SaveMode,
     query: LogicalPlan)
-  extends RunnableCommand {
+  extends AtomicRunnableCommand {
 
   override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] ={
     assert(table.tableType != CatalogTableType.VIEW)
     assert(table.provider.isDefined)
 
@@ -53,6 +57,7 @@ case class CreateCarbonSourceTableAsSelectCommand(
     val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = table.identifier.copy(database = Some(db))
     val tableName = tableIdentWithDB.unquotedString
+    setAuditTable(db, table.identifier.table)
 
     if (sessionState.catalog.tableExists(tableIdentWithDB)) {
       assert(mode != SaveMode.Overwrite,
@@ -127,4 +132,6 @@ case class CreateCarbonSourceTableAsSelectCommand(
         throw ex
     }
   }
+
+  override protected def opName: String = "CREATE TABLE AS SELECT"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
index 1aa8b82..c9d5eb1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
@@ -34,7 +34,7 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
  * RDD to merge all bloomindex files of specified segment for bloom datamap
  */
 class CarbonMergeBloomIndexFilesRDD(
-  @transient ss: SparkSession,
+  @transient private val ss: SparkSession,
   carbonTable: CarbonTable,
   segmentIds: Seq[String],
   bloomDatamapNames: Seq[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index a0bdd64..2119e4c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -25,8 +25,6 @@ import org.apache.spark.sql.execution.command.CompactionModel
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -132,8 +130,6 @@ class AggregateDataMapCompactor(carbonLoadModel: 
CarbonLoadModel,
                                   carbonLoadModel.getTableName)
         LOGGER
           .info(s"Compaction request for datamap ${ 
carbonTable.getTableUniqueName } is successful")
-        Audit.log(LOGGER,
-          s"Compaction request for datamap ${carbonTable.getTableUniqueName} 
is successful")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2d0bc58..dc52517 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -28,27 +28,24 @@ import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
 
-import com.univocity.parsers.common.TextParsingException
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
NewHadoopRDD, RDD}
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, 
SQLContext}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CompactionModel, 
ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
@@ -68,16 +65,15 @@ import 
org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, 
CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, 
CSVInputFormat, StringArrayWritable}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent,
 LoadTablePreStatusUpdateEvent}
-import 
org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, 
NoRetryException}
+import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.processing.util.{Auditor, 
CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
@@ -132,8 +128,6 @@ object CarbonDataRDDFactory {
           }
       }
     } else {
-      Audit.log(LOGGER, "Not able to acquire the system level compaction lock 
for table " +
-          s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       LOGGER.error("Not able to acquire the compaction lock for table " +
           s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       CarbonCompactionUtil
@@ -308,9 +302,7 @@ object CarbonDataRDDFactory {
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None,
-      operationContext: OperationContext): Unit = {
-    Audit.log(LOGGER, s"Data load request has been received for table" +
-                 s" ${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+      operationContext: OperationContext): LoadMetadataDetails = {
     // Check if any load need to be deleted before loading new data
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
@@ -428,11 +420,11 @@ object CarbonDataRDDFactory {
     if (updateModel.isDefined) {
       if (loadStatus == SegmentStatus.LOAD_FAILURE) {
         CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, 
executorMessage)
-        return
+        return null
       } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
                  updateModel.get.executorErrors.failureCauses == 
FailureCauses.BAD_RECORDS &&
                  carbonLoadModel.getBadRecordsAction.split(",")(1) == 
LoggerAction.FAIL.name) {
-        return
+        return null
       } else {
         // in success case handle updation of the table status file.
         // success case.
@@ -451,9 +443,7 @@ object CarbonDataRDDFactory {
         // this means that the update doesnt have any records to update so no 
need to do table
         // status file updation.
         if (resultSize == 0) {
-          Audit.log(LOGGER, "Data update is successful with 0 rows updation 
for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
-          return
+          return null
         }
         if (CarbonUpdateUtil.updateTableMetadataStatus(
           segmentDetails,
@@ -462,20 +452,15 @@ object CarbonDataRDDFactory {
           true,
           new util.ArrayList[Segment](0),
           new util.ArrayList[Segment](segmentFiles), "")) {
-          Audit.log(LOGGER, "Data update is successful for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         } else {
-          val errorMessage = "Data update failed due to failure in table 
status updation."
-          Audit.log(LOGGER, "Data update is failed for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
           LOGGER.error("Data update failed due to failure in table status 
updation.")
           updateModel.get.executorErrors.errorMsg = errorMessage
           updateModel.get.executorErrors.failureCauses = FailureCauses
             .STATUS_FILE_UPDATION_FAILURE
-          return
+          return null
         }
       }
-      return
+      return null
     }
     val uniqueTableStatusId = 
operationContext.getProperty("uuid").asInstanceOf[String]
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
@@ -488,8 +473,6 @@ object CarbonDataRDDFactory {
         clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
       }
       LOGGER.info("********clean up done**********")
-      Audit.log(LOGGER, s"Data load is failed for " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       LOGGER.warn("Cannot write load metadata file as data load failed")
       throw new Exception(errorMessage)
     } else {
@@ -507,8 +490,6 @@ object CarbonDataRDDFactory {
           clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
-        Audit.log(LOGGER, s"Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         throw new Exception(status(0)._2._2.errorMsg)
       }
       // as no record loaded in new segment, new segment should be deleted
@@ -542,11 +523,10 @@ object CarbonDataRDDFactory {
         carbonTable.getCarbonTableIdentifier,
         carbonLoadModel)
       
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
-      val done =
+      val (done, writtenSegment) =
         updateTableStatus(
           status,
           carbonLoadModel,
-          loadStatus,
           newEntryLoadStatus,
           overwriteTable,
           segmentFileName,
@@ -575,18 +555,17 @@ object CarbonDataRDDFactory {
           clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
-        Audit.log(LOGGER, "Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         LOGGER.error("Data load failed due to failure in table status 
updation.")
         throw new Exception("Data load failed due to failure in table status 
updation.")
       }
       if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
-        Audit.log(LOGGER, "Data load is partially successful for " +
+        LOGGER.info("Data load is partially successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       } else {
-        Audit.log(LOGGER, "Data load is successful for " +
+        LOGGER.info("Data load is successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       }
+
       try {
         // compaction handling
         if (carbonTable.isHivePartitionTable) {
@@ -605,6 +584,7 @@ object CarbonDataRDDFactory {
             operationContext)
           carbonLoadModel.setMergedSegmentIds(compactedSegments)
         }
+        writtenSegment
       } catch {
         case e: Exception =>
           throw new Exception(
@@ -845,8 +825,6 @@ object CarbonDataRDDFactory {
                 s" ${ 
CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
     if (!carbonTable.isChildDataMap &&
         CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
-      Audit.log(LOGGER, s"Compaction request received for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       val compactionSize = 0
       val isCompactionTriggerByDDl = false
       val compactionModel = CompactionModel(
@@ -905,8 +883,6 @@ object CarbonDataRDDFactory {
               throw e
           }
         } else {
-          Audit.log(LOGGER, "Not able to acquire the compaction lock for table 
" +
-                       s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName}")
           LOGGER.error("Not able to acquire the compaction lock for table " +
                        s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName}")
         }
@@ -916,15 +892,22 @@ object CarbonDataRDDFactory {
 
   /**
    * Update table status file after data loading
+   * @param status status collected from each task
+   * @param carbonLoadModel load model used for loading
+   * @param newEntryLoadStatus segment status to set in the metadata
+   * @param overwriteTable true the operation is overwrite
+   * @param segmentFileName segment file name
+   * @param uuid uuid for the table status file name
+   * @return whether operation success and
+   *         the segment metadata that written into the segment status file
    */
   private def updateTableStatus(
       status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
       carbonLoadModel: CarbonLoadModel,
-      loadStatus: SegmentStatus,
       newEntryLoadStatus: SegmentStatus,
       overwriteTable: Boolean,
       segmentFileName: String,
-      uuid: String = ""): Boolean = {
+      uuid: String = ""): (Boolean, LoadMetadataDetails) = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val metadataDetails = if (status != null && status.size > 0 && status(0) 
!= null) {
       status(0)._2._1
@@ -948,14 +931,12 @@ object CarbonDataRDDFactory {
     if (!done) {
       val errorMessage = s"Dataload failed due to failure in table status 
updation for" +
                          s" ${carbonLoadModel.getTableName}"
-      Audit.log(LOGGER, "Data load is failed for " +
-                   
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-      LOGGER.error("Dataload failed due to failure in table status updation.")
+      LOGGER.error(errorMessage)
       throw new Exception(errorMessage)
     } else {
       DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
     }
-    done
+    (done, metadataDetails)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index ac83212..16309fe 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -27,12 +27,9 @@ import scala.collection.mutable
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, 
CompactionCallableModel, CompactionModel}
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.metadata.SegmentFileStore
-import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, 
TableStatusReadCommittedScope}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
@@ -304,25 +301,16 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       // true because compaction for all datamaps will be finished at a time 
to the maximum level
       // possible (level 1, 2 etc). so we need to check for either condition
       if (!statusFileUpdation || !commitComplete) {
-        Audit.log(LOGGER,
-          s"Compaction request failed for table ${ 
carbonLoadModel.getDatabaseName }." +
-                     s"${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Compaction request failed for table ${ 
carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
         throw new Exception(s"Compaction failed to update metadata for table" +
                             s" ${ carbonLoadModel.getDatabaseName }." +
                             s"${ carbonLoadModel.getTableName }")
       } else {
-        Audit.log(LOGGER,
-          s"Compaction request completed for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         LOGGER.info(s"Compaction request completed for table " +
                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       }
     } else {
-      Audit.log(LOGGER, s"Compaction request failed for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }"
-      )
       LOGGER.error(s"Compaction request failed for table " +
                    s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       throw new Exception("Compaction Failure in Merger Rdd.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 1b9fb44..ece688e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -21,19 +21,15 @@ import java.util.concurrent.{ConcurrentHashMap, 
CountDownLatch, TimeUnit}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.sql.types.{StructField, StructType}
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.NoSuchStreamException
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.spark.StreamingOption
 import org.apache.carbondata.streaming.CarbonStreamException
 
@@ -160,9 +156,6 @@ object StreamJobManager {
         StreamJobDesc(job, streamName, sourceTable.getDatabaseName, 
sourceTable.getTableName,
           sinkTable.getDatabaseName, sinkTable.getTableName, query, thread))
 
-      Audit.log(LOGGER, s"STREAM $streamName started with job id 
'${job.id.toString}', " +
-                   s"from 
${sourceTable.getDatabaseName}.${sourceTable.getTableName} " +
-                   s"to 
${sinkTable.getDatabaseName}.${sinkTable.getTableName}")
       job.id.toString
     } else {
       thread.interrupt()
@@ -181,10 +174,6 @@ object StreamJobManager {
       jobDesc.streamingQuery.stop()
       jobDesc.thread.interrupt()
       jobs.remove(streamName)
-      Audit.log(LOGGER,
-        s"STREAM $streamName stopped, job id 
'${jobDesc.streamingQuery.id.toString}', " +
-                   s"from ${jobDesc.sourceDb}.${jobDesc.sourceTable} " +
-                   s"to ${jobDesc.sinkDb}.${jobDesc.sinkTable}")
     } else {
       if (!ifExists) {
         throw new NoSuchStreamException(streamName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
index ea0f9e7..8fb3cc1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
@@ -24,7 +24,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -37,7 +36,7 @@ class MergeBloomIndexEventListener extends 
OperationEventListener with Logging {
   override def onEvent(event: Event, operationContext: OperationContext): Unit 
= {
     event match {
       case datamapPostEvent: BuildDataMapPostExecutionEvent =>
-        Audit.log(LOGGER, "Load post status event-listener called for merge 
bloom index")
+        LOGGER.info("Load post status event-listener called for merge bloom 
index")
         val carbonTableIdentifier = datamapPostEvent.identifier
         val carbonTable = 
DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
         val tableDataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable)

Reply via email to