This is an automated email from the ASF dual-hosted git repository.

richardantal pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new dc692ad  PHOENIX-6456 Support query logging for DDL and DML
dc692ad is described below

commit dc692ad28a6b4c01c84a852b12e3408940def7c6
Author: Richard Antal <antal97rich...@gmail.com>
AuthorDate: Sun Apr 25 10:14:11 2021 +0200

    PHOENIX-6456 Support query logging for DDL and DML
---
 .../org/apache/phoenix/end2end/AuditLoggingIT.java | 248 +++++++++++++++++++++
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |   7 +
 .../apache/phoenix/jdbc/PhoenixEmbeddedDriver.java |   2 +-
 .../phoenix/jdbc/PhoenixPreparedStatement.java     |   4 +-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  | 105 +++++++--
 .../org/apache/phoenix/log/AuditQueryLogger.java   | 119 ++++++++++
 ...andler.java => QueryLogDetailsWorkHandler.java} |  28 +--
 .../java/org/apache/phoenix/log/QueryLogger.java   |  10 +-
 .../apache/phoenix/log/QueryLoggerDisruptor.java   |  25 ++-
 .../org/apache/phoenix/log/TableLogWriter.java     |   4 +
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   1 +
 12 files changed, 505 insertions(+), 50 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AuditLoggingIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AuditLoggingIT.java
new file mode 100644
index 0000000..e83ff90
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AuditLoggingIT.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class AuditLoggingIT extends ParallelStatsDisabledIT {
+
+    @Test
+    public void testEmptyLogging() throws Exception {
+        String createqQery = "create table test1 (mykey integer not null 
primary key," +
+                " mycolumn varchar)";
+        String upsertQuery = "upsert into test1 values (1,'Hello')";
+        String selectQuery = "select * from test1";
+        String getLogsQuery = "select * from SYSTEM.LOG WHERE 
TABLE_NAME='TEST1' order by start_time";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute(createqQery);
+            stmt.execute(upsertQuery);
+            stmt.executeQuery(selectQuery);
+            conn.commit();
+
+            ResultSet rs = stmt.executeQuery(getLogsQuery);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testLoggingSelect() throws Exception {
+        String createqQery = "create table test2 (mykey integer not null 
primary key," +
+                " mycolumn varchar)";
+        String upsertQuery = "upsert into test2 values (1,'Hello')";
+        String selectQuery = "select * from test2";
+        String getLogsQuery = "select * from SYSTEM.LOG WHERE 
TABLE_NAME='TEST2' order by start_time";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name());
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            conn.createStatement().execute(createqQery);
+            conn.createStatement().execute(upsertQuery);
+            ResultSet rs = conn.createStatement().executeQuery(selectQuery);
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+            rs.close();
+
+            ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery);
+            assertTrue(rs2.next());
+            assertEquals(rs2.getString(7), selectQuery);
+            assertFalse(rs2.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testLoggingDMLAandDDL() throws Exception {
+        String createqQery = "create table test3 (mykey integer not null 
primary key," +
+                " mycolumn varchar)";
+        String upsertQuery = "upsert into test3 values (1,'Hello')";
+        String selectQuery = "select * from test3";
+        String getLogsQuery = "select * from SYSTEM.LOG WHERE 
TABLE_NAME='TEST3' order by start_time";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name());
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            conn.createStatement().execute(createqQery);
+            conn.createStatement().execute(upsertQuery);
+            ResultSet rs = conn.createStatement().executeQuery(selectQuery);
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+            rs.close();
+
+            ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery);
+            assertTrue(rs2.next());
+            assertEquals(rs2.getString(7), createqQery);
+            assertTrue(rs2.next());
+            assertEquals(rs2.getString(7), upsertQuery);
+
+            assertFalse(rs2.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testLoggingDMLAandDDLandSelect() throws Exception {
+        String createqQery = "create table test4 (mykey integer not null 
primary key," +
+                " mycolumn varchar)";
+        String upsertQuery = "upsert into test4 values (1,'Hello')";
+        String selectQuery = "select * from test4";
+        String getLogsQuery = "select * from SYSTEM.LOG WHERE 
TABLE_NAME='TEST4' order by start_time";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name());
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name());
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            Statement stat = conn.createStatement();
+            stat.execute(createqQery);
+            stat.execute(upsertQuery);
+            ResultSet rs = stat.executeQuery(selectQuery);
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+            rs.close();
+
+            ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery);
+            assertTrue(rs2.next());
+            assertEquals(rs2.getString(7), createqQery);
+            assertTrue(rs2.next());
+            assertEquals(rs2.getString(7), upsertQuery);
+            assertTrue(rs2.next());
+            assertEquals(rs2.getString(7), selectQuery);
+
+            assertFalse(rs2.next());
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testLogginParameterizedUpsert() throws Exception {
+        String createqQery = "create table test5 (mykey integer not null 
primary key," +
+                " mycolumn varchar)";
+        String upsertQuery = "upsert into test5 values (?, ?)";
+        String selectQuery = "select * from test5";
+        String getLogsQuery = "select * from SYSTEM.LOG WHERE 
TABLE_NAME='TEST5' order by start_time";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name());
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name());
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            Statement stat = conn.createStatement();
+            stat.execute(createqQery);
+
+
+            PreparedStatement p = conn.prepareStatement(upsertQuery);
+            p.setInt(1, 1);
+            p.setString(2, "foo");
+
+            p.execute();
+
+            p.setInt(1, 2);
+            p.setString(2, "bar");
+
+            p.execute();
+
+            ResultSet rs = stat.executeQuery(selectQuery);
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+            rs.close();
+
+            ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery);
+            assertTrue(rs2.next());
+            assertTrue(rs2.next());
+            assertEquals("1,foo", rs2.getString(13));
+            assertTrue(rs2.next());
+            assertEquals( "2,bar", rs2.getString(13));
+            assertTrue(rs2.next());
+            assertFalse(rs2.next());
+
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testlogSamplingRate() throws Exception {
+        String createqQery = "create table test6 (mykey integer not null 
primary key," +
+                " mycolumn varchar)";
+
+        String selectQuery = "select * from test6";
+        String getLogsQuery = "select * from SYSTEM.LOG WHERE 
TABLE_NAME='TEST6' order by start_time";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name());
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name());
+        props.setProperty(QueryServices.LOG_SAMPLE_RATE, "0.5");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            Statement stat = conn.createStatement();
+            stat.execute(createqQery);
+            String upsertQuery;
+            for (int i = 0; i<100; i++) {
+                upsertQuery  = "upsert into test6 values (" + i + ",'asd')";
+                stat.execute(upsertQuery);
+                ResultSet rs = stat.executeQuery(selectQuery);
+                assertTrue(rs.next());
+                rs.close();
+            }
+
+            ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery);
+            int numOfUpserts = 0;
+            int numOfSelects = 0;
+            while (rs2.next()) {
+                String query = rs2.getString(7);
+                if (query.equals(selectQuery)) {
+                    numOfSelects++;
+                }
+                else if (query.contains("upsert into test6 values (")) {
+                    numOfUpserts++;
+                }
+            }
+            assertEquals(numOfUpserts, 100);
+            assertTrue(numOfSelects > 0 && numOfSelects < 100);
+            System.out.println(numOfSelects);
+
+        } finally {
+            conn.close();
+        }
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index dab4c6a..74aefbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -172,6 +172,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     private TableResultIteratorFactory tableResultIteratorFactory;
     private boolean isRunningUpgrade;
     private LogLevel logLevel;
+    private LogLevel auditLogLevel;
     private Double logSamplingRate;
     private String sourceOfOperation;
 
@@ -381,6 +382,8 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         };
         this.logLevel= 
LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
                 QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+        this.auditLogLevel= 
LogLevel.valueOf(this.services.getProps().get(QueryServices.AUDIT_LOG_LEVEL,
+                QueryServicesOptions.DEFAULT_AUDIT_LOGGING_LEVEL));
         this.isRequestLevelMetricsEnabled = 
JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info,
                 this.services.getProps());
         this.mutationState = mutationState == null ? newMutationState(maxSize,
@@ -1358,6 +1361,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     public LogLevel getLogLevel(){
         return this.logLevel;
     }
+
+    public LogLevel getAuditLogLevel(){
+        return this.auditLogLevel;
+    }
     
     public Double getLogSamplingRate(){
         return this.logSamplingRate;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 989475e..27702c9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -349,7 +349,7 @@ public abstract class PhoenixEmbeddedDriver implements 
Driver, SQLCloseable {
             }
             if(principal == null){
                 if (!isConnectionless) {
-                   principal = props.get(QueryServices.HBASE_CLIENT_PRINCIPAL);
+                    principal = 
props.get(QueryServices.HBASE_CLIENT_PRINCIPAL);
                 }
             }
             if(keytab == null){
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index b54efc8..59d8add 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -170,7 +170,7 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Prepar
             .build().buildException();
         }
         if (statement.getOperation().isMutation()) {
-            executeMutation(statement);
+            executeMutation(statement, 
createAuditQueryLogger(statement,query));
             return false;
         }
         executeQuery(statement, createQueryLogger(statement,query));
@@ -203,7 +203,7 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Prepar
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
             .build().buildException();
         }
-        return executeMutation(statement);
+        return executeMutation(statement, 
createAuditQueryLogger(statement,query));
     }
 
     public QueryPlan optimizeQuery() throws SQLException {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 9be11d2..69774b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -88,6 +88,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.log.AuditQueryLogger;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.log.QueryLogInfo;
 import org.apache.phoenix.log.QueryLogger;
@@ -379,12 +380,37 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
             throw new IllegalStateException(); // Can't happen as 
Throwables.propagate() always throws
         }
     }
-    
-    protected int executeMutation(final CompilableStatement stmt) throws 
SQLException {
-      return executeMutation(stmt, true);
+
+    public String getTargetForAudit(CompilableStatement stmt) {
+        String target = null;
+        try {
+            if (stmt instanceof ExecutableUpsertStatement) {
+                return ((ExecutableUpsertStatement) 
stmt).getTable().getName().toString();
+            } else if (stmt instanceof ExecutableDeleteStatement) {
+                return ((ExecutableDeleteStatement) 
stmt).getTable().getName().toString();
+            } else if (stmt instanceof ExecutableCreateTableStatement) {
+                target = 
((ExecutableCreateTableStatement)stmt).getTableName().toString();
+            } else if (stmt instanceof ExecutableDropTableStatement) {
+                target = 
((ExecutableDropTableStatement)stmt).getTableName().toString();
+            } else if (stmt instanceof ExecutableAddColumnStatement) {
+                target = 
((ExecutableAddColumnStatement)stmt).getTable().getName().toString();
+            } else if (stmt instanceof ExecutableCreateSchemaStatement) {
+                return ((ExecutableCreateSchemaStatement) 
stmt).getSchemaName();
+            } else if (stmt instanceof ExecutableDropSchemaStatement) {
+                target = ((ExecutableDropSchemaStatement)stmt).getSchemaName();
+            }
+        } catch (Exception e) {
+            target = stmt.getClass().getName();
+        }
+        return target;
+    }
+
+
+    protected int executeMutation(final CompilableStatement stmt, final 
AuditQueryLogger queryLogger) throws SQLException {
+        return executeMutation(stmt, true, queryLogger);
     }
 
-    private int executeMutation(final CompilableStatement stmt, final boolean 
doRetryOnMetaNotFoundError) throws SQLException {
+    private int executeMutation(final CompilableStatement stmt, final boolean 
doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger) throws 
SQLException {
         if (connection.isReadOnly()) {
             throw new SQLExceptionInfo.Builder(
                 SQLExceptionCode.READ_ONLY_CONNECTION).
@@ -425,6 +451,13 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
                                 
connection.incrementStatementExecutionCounter();
+                                if(queryLogger.isAuditLoggingEnabled()) {
+                                    queryLogger.log(QueryLogInfo.TABLE_NAME_I, 
getTargetForAudit(stmt));
+                                    
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
+                                    
queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
+                                    queryLogger.syncAudit();
+                                }
+
                                 return lastUpdateCount;
                             }
                             //Force update cache and retry if meta not found 
error occurs
@@ -435,7 +468,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                                     }
                                     if (new 
MetaDataClient(connection).updateCache(connection.getTenantId(),
                                         e.getSchemaName(), e.getTableName(), 
true).wasUpdated()) {
-                                        return executeMutation(stmt, false);
+                                        return executeMutation(stmt, false, 
queryLogger);
                                     }
                                 }
                                 throw e;
@@ -451,6 +484,12 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                     }, PhoenixContextExecutor.inContext(),
                         Tracing.withTracing(connection, this.toString()));
         } catch (Exception e) {
+            if(queryLogger.isAuditLoggingEnabled()) {
+                queryLogger.log(QueryLogInfo.TABLE_NAME_I, 
getTargetForAudit(stmt));
+                queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, 
Throwables.getStackTraceAsString(e));
+                queryLogger.log(QueryLogInfo.QUERY_STATUS_I, 
QueryStatus.FAILED.toString());
+                queryLogger.syncAudit();
+            }
             Throwables.propagateIfInstanceOf(e, SQLException.class);
             Throwables.propagate(e);
             throw new IllegalStateException(); // Can't happen as 
Throwables.propagate() always throws
@@ -1855,25 +1894,47 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         return compileMutation(stmt, sql);
     }
 
+    public boolean isSystemTable(CompilableStatement stmt) {
+        boolean systemTable = false;
+        TableName tableName = null;
+        if (stmt instanceof ExecutableSelectStatement) {
+            TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
+            if(from instanceof NamedTableNode) {
+                tableName = ((NamedTableNode)from).getName();
+            }
+        } else if (stmt instanceof ExecutableUpsertStatement) {
+            tableName = ((ExecutableUpsertStatement)stmt).getTable().getName();
+        } else if (stmt instanceof ExecutableDeleteStatement) {
+            tableName = ((ExecutableDeleteStatement)stmt).getTable().getName();
+        } else if (stmt instanceof ExecutableAddColumnStatement) {
+            tableName = 
((ExecutableAddColumnStatement)stmt).getTable().getName();
+        }
+
+        if (tableName != null && PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA
+                .equals(tableName.getSchemaName())) {
+            systemTable = true;
+        }
+
+        return systemTable;
+    }
+
     public QueryLogger createQueryLogger(CompilableStatement stmt, String sql) 
throws SQLException {
         if (connection.getLogLevel() == LogLevel.OFF) {
             return QueryLogger.NO_OP_INSTANCE;
         }
 
-        boolean isSystemTable = false;
-        if(stmt instanceof ExecutableSelectStatement) {
-            TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
-            if(from instanceof NamedTableNode) {
-                String schemaName = 
((NamedTableNode)from).getName().getSchemaName();
-                if(schemaName == null) {
-                    schemaName=connection.getSchema();
-                }
-                if 
(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
-                    isSystemTable = true;
-                }
-            }
+        QueryLogger queryLogger = QueryLogger.getInstance(connection, 
isSystemTable(stmt));
+        QueryLoggerUtil.logInitialDetails(queryLogger, 
connection.getTenantId(),
+                connection.getQueryServices(), sql, getParameters());
+        return queryLogger;
+    }
+
+    public AuditQueryLogger createAuditQueryLogger(CompilableStatement stmt, 
String sql) throws SQLException {
+        if (connection.getAuditLogLevel() == LogLevel.OFF) {
+            return AuditQueryLogger.NO_OP_INSTANCE;
         }
-        QueryLogger queryLogger = 
QueryLogger.getInstance(connection,isSystemTable);
+
+        AuditQueryLogger queryLogger = 
AuditQueryLogger.getInstance(connection, isSystemTable(stmt));
         QueryLoggerUtil.logInitialDetails(queryLogger, 
connection.getTenantId(),
                 connection.getQueryServices(), sql, getParameters());
         return queryLogger;
@@ -1890,7 +1951,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         if (stmt.getOperation().isMutation()) {
             throw new ExecuteQueryNotApplicableException(sql);
         }
-        return executeQuery(stmt,createQueryLogger(stmt,sql));
+        return executeQuery(stmt, createQueryLogger(stmt, sql));
     }
 
     @Override
@@ -1903,7 +1964,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
             .build().buildException();
         }
-        int updateCount = executeMutation(stmt);
+        int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, 
sql));
         flushIfNecessary();
         return updateCount;
     }
@@ -1922,12 +1983,12 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
                 .build().buildException();
             }
-            executeMutation(stmt);
+            executeMutation(stmt, createAuditQueryLogger(stmt, sql));
             flushIfNecessary();
             return false;
         }
         
-        executeQuery(stmt,createQueryLogger(stmt,sql));
+        executeQuery(stmt, createQueryLogger(stmt, sql));
         return true;
     }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/AuditQueryLogger.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/AuditQueryLogger.java
new file mode 100644
index 0000000..8e4fc51
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/AuditQueryLogger.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+
+import java.util.Map;
+
+
+/*
+ * Wrapper for query translator
+ */
+public class AuditQueryLogger extends QueryLogger {
+    private LogLevel auditLogLevel;
+
+
+    private AuditQueryLogger(PhoenixConnection connection) {
+        super(connection);
+        auditLogLevel = connection.getAuditLogLevel();
+
+    }
+
+    private AuditQueryLogger() {
+        super();
+        auditLogLevel = LogLevel.OFF;
+    }
+
+    public static final AuditQueryLogger NO_OP_INSTANCE = new 
AuditQueryLogger() {
+        @Override
+        public void log(QueryLogInfo queryLogInfo, Object info) {
+
+        }
+
+        @Override
+        public boolean isDebugEnabled() {
+            return false;
+        }
+
+        @Override
+        public boolean isInfoEnabled() {
+            return false;
+        }
+
+        @Override
+        public void sync(
+                Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+
+        }
+
+        @Override
+        public void syncAudit(
+                Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+
+        }
+
+        @Override
+        public boolean isSynced(){
+            return true;
+        }
+    };
+
+    public static AuditQueryLogger getInstance(PhoenixConnection connection, 
boolean isSystemTable) {
+        if (connection.getAuditLogLevel() == LogLevel.OFF || isSystemTable) {
+            return NO_OP_INSTANCE;
+        }
+        return new AuditQueryLogger(connection);
+    }
+
+
+    /**
+     *  Is audit logging currently enabled?
+     *  Call this method to prevent having to perform expensive operations 
(for example,
+     *  String concatenation) when the audit log level is more than info.
+     */
+    public boolean isAuditLoggingEnabled(){
+        return isAuditLevelEnabled(LogLevel.INFO);
+    }
+
+    private boolean isAuditLevelEnabled(LogLevel logLevel){
+        return this.auditLogLevel != null && logLevel != LogLevel.OFF ? 
logLevel.ordinal() <= this.auditLogLevel.ordinal()
+                : false;
+    }
+
+
+
+    public void sync(Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+        syncBase(readMetrics, overAllMetrics, auditLogLevel);
+    }
+
+    public void syncAudit() {
+        syncAudit(null, null);
+    }
+
+    /**
+     *  We force LogLevel.TRACE here because in QueryLogInfo the minimum 
LogLevel for
+     *  TABLE_NAME_I is Debug and for BIND_PARAMETERS_I is TRACE and we would 
like to see
+     *  these parameters even in INFO level when using DDL and DML operations.
+     */
+    public void syncAudit(Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+        syncBase(readMetrics, overAllMetrics, LogLevel.TRACE);
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsWorkHandler.java
similarity index 65%
rename from 
phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
rename to 
phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsWorkHandler.java
index ee6b2d6..82d30a2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsWorkHandler.java
@@ -17,36 +17,29 @@
  */
 package org.apache.phoenix.log;
 
-import java.sql.SQLException;
-
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.WorkHandler;
 import org.apache.hadoop.conf.Configuration;
 
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceReportingEventHandler;
+
+public class QueryLogDetailsWorkHandler implements 
WorkHandler<RingBufferEvent>, LifecycleAware {
 
 
-public class QueryLogDetailsEventHandler implements 
SequenceReportingEventHandler<RingBufferEvent>, LifecycleAware {
-    private Sequence sequenceCallback;
     private LogWriter logWriter;
 
-    public QueryLogDetailsEventHandler(Configuration configuration) throws 
SQLException{
+    public QueryLogDetailsWorkHandler(Configuration configuration) {
         this.logWriter = new TableLogWriter(configuration);
     }
-    
-    @Override
-    public void setSequenceCallback(final Sequence sequenceCallback) {
-        this.sequenceCallback = sequenceCallback;
-    }
 
     @Override
-    public void onEvent(final RingBufferEvent event, final long sequence, 
final boolean endOfBatch) throws Exception {
-        logWriter.write(event);
-        event.clear();
+    public void onEvent(RingBufferEvent ringBufferEvent) throws Exception {
+        logWriter.write(ringBufferEvent);
+        ringBufferEvent.clear();
     }
 
     @Override
     public void onStart() {
+
     }
 
     @Override
@@ -59,5 +52,4 @@ public class QueryLogDetailsEventHandler implements 
SequenceReportingEventHandle
             //Ignore
         }
     }
-
-}
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
index 68a4e91..b132bbd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
@@ -43,15 +43,15 @@ public class QueryLogger {
     private boolean isSynced;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryLogger.class);
     
-    private QueryLogger(PhoenixConnection connection) {
+    protected QueryLogger(PhoenixConnection connection) {
         this.queryId = UUID.randomUUID().toString();
         this.queryDisruptor = 
connection.getQueryServices().getQueryDisruptor();
         logLevel = connection.getLogLevel();
         log(QueryLogInfo.QUERY_ID_I, queryId);
         log(QueryLogInfo.START_TIME_I, 
EnvironmentEdgeManager.currentTimeMillis());
     }
-    
-    private QueryLogger() {
+
+    protected QueryLogger() {
         logLevel = LogLevel.OFF;
     }
     
@@ -155,6 +155,10 @@ public class QueryLogger {
     
 
     public void sync(Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+        syncBase(readMetrics, overAllMetrics, logLevel);
+    }
+
+    public void syncBase(Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics, LogLevel logLevel) {
         if (!isSynced) {
             isSynced = true;
             final RingBufferEventTranslator translator = getCachedTranslator();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
index 3c8f955..935b058 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
@@ -23,7 +23,6 @@ import java.sql.SQLException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.query.QueryServices;
 import org.slf4j.Logger;
@@ -46,6 +45,7 @@ public class QueryLoggerDisruptor implements Closeable{
     private static final int RING_BUFFER_SIZE = 8 * 1024;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryLoggerDisruptor.class);
     private static final String DEFAULT_WAIT_STRATEGY = 
BlockingWaitStrategy.class.getName();
+    private static final int DEFAULT_AUDIT_LOGGER_PROCESS_COUNT = 1;
     
     public QueryLoggerDisruptor(Configuration configuration) throws 
SQLException{
         WaitStrategy waitStrategy;
@@ -74,12 +74,29 @@ public class QueryLoggerDisruptor implements Closeable{
         final ExceptionHandler<RingBufferEvent> errorHandler = new 
QueryLoggerDefaultExceptionHandler();
         disruptor.setDefaultExceptionHandler(errorHandler);
 
-        final QueryLogDetailsEventHandler[] handlers = { new 
QueryLogDetailsEventHandler(configuration) };
-        disruptor.handleEventsWith(handlers);
+        /**
+         * if LOG_HANDLER_COUNT is 1 it will work as the previous 
implementation
+         * if LOG_HANDLER_COUNT is 2 or more then Multi Thread
+         */
+        int handlerCount = configuration.getInt(
+                QueryServices.LOG_HANDLER_COUNT, 
DEFAULT_AUDIT_LOGGER_PROCESS_COUNT);
+
+        if (handlerCount <= 0){
+            LOGGER.error("Audit Log Handler Count must be greater than 0." +
+                    "change to default value, input : " + handlerCount);
+            handlerCount = DEFAULT_AUDIT_LOGGER_PROCESS_COUNT;
+        }
+
+        QueryLogDetailsWorkHandler[] workHandlers = new 
QueryLogDetailsWorkHandler[handlerCount];
+        for (int i = 0; i < handlerCount; i++){
+            workHandlers[i] = new QueryLogDetailsWorkHandler(configuration);
+        }
+        disruptor.handleEventsWithWorkerPool(workHandlers);
+
         LOGGER.info("Starting  QueryLoggerDisruptor for with ringbufferSize=" +
                 disruptor.getRingBuffer().getBufferSize() + ", waitStrategy=" +
                 waitStrategy.getClass().getSimpleName() + ", " + 
"exceptionHandler="
-                + errorHandler + "...");
+                + errorHandler + ", handlerCount=" + handlerCount);
         disruptor.start();
         
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
index 7dd7b17..966bed4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -96,6 +96,10 @@ public class TableLogWriter implements LogWriter {
             }
         }
 
+        if (connection.isReadOnly()) {
+            return;
+        }
+
         ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo();
         for (QueryLogInfo info : QueryLogInfo.values()) {
             if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index ac9a396..78f153c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -309,9 +309,11 @@ public interface QueryServices extends SQLCloseable {
     public static final String WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB =
             "phoenix.query.wildcard.dynamicColumns";
     public static final String LOG_LEVEL = "phoenix.log.level";
+    public static final String AUDIT_LOG_LEVEL = "phoenix.audit.log.level";
     public static final String LOG_BUFFER_SIZE = "phoenix.log.buffer.size";
     public static final String LOG_BUFFER_WAIT_STRATEGY = 
"phoenix.log.wait.strategy";
     public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate";
+    public static final String LOG_HANDLER_COUNT = "phoenix.log.handler.count";
 
        public static final String SYSTEM_CATALOG_SPLITTABLE = 
"phoenix.system.catalog.splittable";
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 85f932b..ac343de 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -361,6 +361,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
     public static final boolean DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB = 
false;
     public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name();
+    public static final String DEFAULT_AUDIT_LOGGING_LEVEL = 
LogLevel.OFF.name();
     public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";
     public static final int DEFAULT_LOG_SALT_BUCKETS = 32;
     public static final int DEFAULT_SALT_BUCKETS = 0;

Reply via email to