shaofengshi closed pull request #362: KYLIN-3700 Quote sql identities when 
creating flat table
URL: https://github.com/apache/kylin/pull/362
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 707b3f33c9..8a99906e3e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -68,7 +68,8 @@ public String build() {
         case CLI:
             buf.append("hive -e \"");
             for (String statement : statements) {
-                buf.append(statement).append("\n");
+                //in bash need escape " and ` by using \
+                buf.append(statement.replaceAll("`", "\\\\`")).append("\n");
             }
             buf.append("\"");
             buf.append(parseProps());
@@ -79,7 +80,7 @@ public String build() {
             try {
                 tmpHqlPath = "/tmp/" + UUID.randomUUID().toString() + ".hql";
                 for (String statement : statements) {
-                    hql.append(statement);
+                    hql.append(statement.replaceAll("`", "\\\\`"));
                     hql.append("\n");
                 }
                 String createFileCmd = String.format(Locale.ROOT, 
CREATE_HQL_TMP_FILE_TEMPLATE, tmpHqlPath, hql);
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
index ecc8961e33..8c852c1197 100644
--- 
a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
@@ -61,12 +61,12 @@ public void testHiveCLI() {
         hivePropsOverwrite.put("hive.execution.engine", "tez");
         HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement("USE default;");
-        hiveCmdBuilder.addStatement("DROP TABLE test;");
+        hiveCmdBuilder.addStatement("DROP TABLE `test`;");
         hiveCmdBuilder.addStatement("SHOW\n TABLES;");
         hiveCmdBuilder.setHiveConfProps(hiveProps);
         hiveCmdBuilder.overwriteHiveProps(hivePropsOverwrite);
         assertEquals(
-                "hive -e \"USE default;\nDROP TABLE test;\nSHOW\n TABLES;\n\" 
--hiveconf hive.execution.engine=tez",
+                "hive -e \"USE default;\nDROP TABLE \\`test\\`;\nSHOW\n 
TABLES;\n\" --hiveconf hive.execution.engine=tez",
                 hiveCmdBuilder.build());
     }
 
@@ -80,7 +80,7 @@ public void testBeeline() throws IOException {
 
         HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement("USE default;");
-        hiveCmdBuilder.addStatement("DROP TABLE test;");
+        hiveCmdBuilder.addStatement("DROP TABLE `test`;");
         hiveCmdBuilder.addStatement("SHOW TABLES;");
 
         String cmd = hiveCmdBuilder.build();
@@ -91,7 +91,7 @@ public void testBeeline() throws IOException {
         Pair<Integer, String> execute = 
cliCommandExecutor.execute(createFileCmd);
         String hqlStatement = FileUtils.readFileToString(new File(hqlFile), 
Charset.defaultCharset());
         assertEquals(
-                "USE default;" + lineSeparator + "DROP TABLE test;" + 
lineSeparator + "SHOW TABLES;" + lineSeparator,
+                "USE default;" + lineSeparator + "DROP TABLE `test`;" + 
lineSeparator + "SHOW TABLES;" + lineSeparator,
                 hqlStatement);
         assertBeelineCmd(cmd);
         FileUtils.forceDelete(new File(hqlFile));
@@ -105,7 +105,7 @@ public void testSparkSqlForTableOps() throws IOException {
 
         HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement("USE default;");
-        hiveCmdBuilder.addStatement("DROP TABLE test;");
+        hiveCmdBuilder.addStatement("DROP TABLE `test`;");
         hiveCmdBuilder.addStatement("SHOW TABLES;");
         String cmd = hiveCmdBuilder.build();
         assertBeelineCmd(cmd);
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index ff48244687..d7e3b72e28 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -41,6 +41,9 @@
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import static org.apache.kylin.job.util.FlatTableSqlQuoteUtils.quote;
+import static 
org.apache.kylin.job.util.FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -124,7 +127,7 @@ public static String 
generateInsertDataStatement(IJoinedFlatTableDesc flatDesc)
             }
         }
 
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + 
generateSelectDataStatement(flatDesc)
+        return "INSERT OVERWRITE TABLE " + quote(flatDesc.getTableName()) + " 
" + generateSelectDataStatement(flatDesc)
                 + ";\n";
     }
 
@@ -146,10 +149,14 @@ public static String 
generateSelectDataStatement(IJoinedFlatTableDesc flatDesc,
                 sql.append(",");
             }
             String colTotalName = String.format(Locale.ROOT, "%s.%s", 
col.getTableRef().getTableName(), col.getName());
+            String quotedColTotalName = String.format(Locale.ROOT, "%s.%s",
+                    quote(col.getTableRef().getTableName()),
+                    quote(col.getName()));
             if (skipAsList.contains(colTotalName)) {
-                sql.append(col.getExpressionInSourceDB() + sep);
+                sql.append(getQuotedColExpressionInSourceDB(flatDesc, 
col)).append(sep);
             } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + 
colName(col, true) + sep);
+                sql.append(getQuotedColExpressionInSourceDB(flatDesc, 
col)).append(" as ")
+                        .append(quote(colName(col))).append(sep);
             }
         }
         appendJoinStatement(flatDesc, sql, singleLine);
@@ -157,13 +164,14 @@ public static String 
generateSelectDataStatement(IJoinedFlatTableDesc flatDesc,
         return sql.toString();
     }
 
-    public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql, boolean singleLine) {
+    static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
         Set<TableRef> dimTableCache = new HashSet<>();
 
         DataModelDesc model = flatDesc.getDataModel();
         TableRef rootTable = model.getRootFactTable();
-        sql.append("FROM " + rootTable.getTableIdentity() + " as " + 
rootTable.getAlias() + " " + sep);
+        sql.append(" FROM 
").append(flatDesc.getDataModel().getRootFactTable().getTableIdentityQuoted("`"))
+                .append(" as 
").append(quote(rootTable.getAlias())).append(sep);
 
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
@@ -177,13 +185,15 @@ public static void 
appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuil
                     }
                     String joinType = join.getType().toUpperCase(Locale.ROOT);
 
-                    sql.append(joinType + " JOIN " + 
dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
+                    sql.append(joinType).append(" JOIN 
").append(dimTable.getTableIdentityQuoted("`"))
+                            .append(" as 
").append(quote(dimTable.getAlias())).append(sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
                         if (i > 0) {
                             sql.append(" AND ");
                         }
-                        sql.append(fk[i].getExpressionInSourceDB() + " = " + 
pk[i].getExpressionInSourceDB());
+                        sql.append(getQuotedColExpressionInSourceDB(flatDesc, 
fk[i])).append(" = ")
+                                
.append(getQuotedColExpressionInSourceDB(flatDesc, pk[i]));
                     }
                     sql.append(sep);
 
@@ -218,9 +228,10 @@ private static void 
appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBu
 
         DataModelDesc model = flatDesc.getDataModel();
         if (StringUtils.isNotEmpty(model.getFilterCondition())) {
-            whereBuilder.append(" AND 
(").append(model.getFilterCondition()).append(") ");
+            String quotedFilterCondition = quoteIdentifierInSqlExpr(flatDesc,
+                    model.getFilterCondition(), "`");
+            whereBuilder.append(" AND 
(").append(quotedFilterCondition).append(") "); // -> filter condition contains 
special character may cause bug
         }
-
         if (flatDesc.getSegment() != null) {
             PartitionDesc partDesc = model.getPartitionDesc();
             if (partDesc != null && partDesc.getPartitionDateColumn() != null) 
{
@@ -228,8 +239,9 @@ private static void 
appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBu
 
                 if (segRange != null && !segRange.isInfinite()) {
                     whereBuilder.append(" AND (");
-                    
whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
-                            flatDesc.getSegment(), segRange));
+                    String quotedPartitionCond = 
quoteIdentifierInSqlExpr(flatDesc,
+                            
partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, 
flatDesc.getSegment(), segRange), "`");
+                    whereBuilder.append(quotedPartitionCond);
                     whereBuilder.append(")" + sep);
                 }
             }
@@ -265,7 +277,7 @@ private static String getHiveDataType(String javaDataType) {
     public static String 
generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc 
cubeDesc) {
         final String tableName = flatDesc.getTableName();
         StringBuilder sql = new StringBuilder();
-        sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + 
tableName);
+        sql.append("INSERT OVERWRITE TABLE " + quote(tableName) + " SELECT * 
FROM " + quote(tableName));
 
         if (flatDesc.getClusterBy() != null) {
             appendClusterStatement(sql, flatDesc.getClusterBy());
@@ -291,4 +303,13 @@ public static String 
generateRedistributeFlatTableStatement(IJoinedFlatTableDesc
         return sql.toString();
     }
 
+    public static String getQuotedColExpressionInSourceDB(IJoinedFlatTableDesc 
flatDesc, TblColRef col) {
+        if (!col.getColumnDesc().isComputedColumn()) {
+            return quote(col.getTableAlias()) + "."
+                    + quote(col.getName());
+        } else {
+            String computeExpr = col.getColumnDesc().getComputedColumnExpr();
+            return quoteIdentifierInSqlExpr(flatDesc, computeExpr, "`");
+        }
+    }
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtils.java 
b/core-job/src/main/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtils.java
new file mode 100644
index 0000000000..4085d0a2ff
--- /dev/null
+++ 
b/core-job/src/main/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtils.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class FlatTableSqlQuoteUtils {
+
+    public static final String QUOTE = "`";
+
+    /**
+     * Quote identifier by default quote `
+     * @param identifier
+     * @return
+     */
+    public static String quote(String identifier){
+        return QUOTE + identifier + QUOTE;
+    }
+
+    /**
+     * Used for quote identifiers in Sql Filter Expression & Computed Column 
Expression for flat table
+     * @param flatDesc
+     * @param quotation
+     * @return
+     */
+    public static String quoteIdentifierInSqlExpr(IJoinedFlatTableDesc 
flatDesc, String sqlExpr, String quotation) {
+        Map<String, String> tabToAliasMap = 
buildTableToTableAliasMap(flatDesc);
+        Map<String, Map<String, String>> tabToColsMap = 
buildTableToColumnsMap(flatDesc);
+
+        boolean tableMatched = false;
+        for (String table : tabToAliasMap.keySet()) {
+            List<String> tabPatterns = getTableNameOrAliasPatterns(table);
+            if (isIdentifierNeedToQuote(sqlExpr, table, tabPatterns)) {
+                sqlExpr = quoteIdentifier(sqlExpr, quotation, table, 
tabPatterns);
+                tableMatched = true;
+            }
+
+            String tabAlias = tabToAliasMap.get(table);
+            List<String> tabAliasPatterns = 
getTableNameOrAliasPatterns(tabAlias);
+            if (isIdentifierNeedToQuote(sqlExpr, tabAlias, tabAliasPatterns)) {
+                sqlExpr = quoteIdentifier(sqlExpr, quotation, tabAlias, 
tabAliasPatterns);
+                tableMatched = true;
+            }
+
+            if (tableMatched) {
+                Set<String> columns = listColumnsInTable(table, tabToColsMap);
+                for (String column : columns) {
+                    List<String> colPatterns = 
getColumnNameOrAliasPatterns(column);
+                    if (isIdentifierNeedToQuote(sqlExpr, column, colPatterns)) 
{
+                        sqlExpr = quoteIdentifier(sqlExpr, quotation, column, 
colPatterns);
+                    }
+                    if (columnHasAlias(table, column, tabToColsMap)) {
+                        String colAlias = getColumnAlias(table, column, 
tabToColsMap);
+                        List<String> colAliasPattern = 
getColumnNameOrAliasPatterns(colAlias);
+                        if (isIdentifierNeedToQuote(sqlExpr, colAlias, 
colAliasPattern)) {
+                            sqlExpr = quoteIdentifier(sqlExpr, quotation, 
colAlias, colPatterns);
+                        }
+                    }
+                }
+            }
+
+            tableMatched = false; //reset
+        }
+        return sqlExpr;
+    }
+
+    /**
+     * Used to quote identifiers for JDBC ext job when quoting cc expr
+     * @param tableDesc
+     * @param sqlExpr
+     * @param quot
+     * @return
+     */
+    public static String quoteIdentifierInSqlExpr(TableDesc tableDesc, String 
sqlExpr, String quot) {
+        String table = tableDesc.getName();
+        boolean tableMatched = false;
+        List<String> tabPatterns = getTableNameOrAliasPatterns(table);
+        if (isIdentifierNeedToQuote(sqlExpr, table, tabPatterns)) {
+            sqlExpr = quoteIdentifier(sqlExpr, quot, table, tabPatterns);
+            tableMatched = true;
+        }
+
+        if (tableMatched) {
+            for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+                String column = columnDesc.getName();
+                List<String> colPatterns = 
getColumnNameOrAliasPatterns(column);
+                if (isIdentifierNeedToQuote(sqlExpr, column, colPatterns)) {
+                    sqlExpr = quoteIdentifier(sqlExpr, quot, column, 
colPatterns);
+                }
+            }
+        }
+
+        return sqlExpr;
+    }
+
+    public static List<String> getTableNameOrAliasPatterns(String tableName) {
+        // Pattern must contain three regex groups, and place identifier in 
sec group ($2)
+        List<String> patterns = Lists.newArrayList();
+        patterns.add("([+\\-*/%&|^=><\\s,(])(" + tableName.trim() + ")(\\.)");
+        patterns.add("([\\.\\s])(" + tableName.trim() + ")([,\\s)])");
+        patterns.add("(^)(" + tableName.trim() + ")([\\.])");
+        return patterns;
+    }
+
+    public static List<String> getColumnNameOrAliasPatterns(String colName) {
+        // Pattern must contain three regex groups, and place identifier in 
sec group ($2)
+        List<String> patterns = Lists.newArrayList();
+        patterns.add("([\\.\\s(])(" + colName.trim() + 
")([+\\-*/%&|^=><\\s,)])");
+        patterns.add("(^)(" + colName.trim() + ")([+\\-*/%&|^=><\\s,)])");
+        return patterns;
+    }
+
+    // visible for test
+    static String quoteIdentifier(String sqlExpr, String quotation, String 
identifier,
+                                  List<String> identifierPatterns) {
+        String quotedIdentifier = quotation + identifier.trim() + quotation;
+
+        for (String pattern : identifierPatterns) {
+            Matcher matcher = Pattern.compile(pattern, 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL).matcher(sqlExpr);
+            if (matcher.find()) {
+                sqlExpr = matcher.replaceAll("$1" + quotedIdentifier + "$3");
+            }
+        }
+        return sqlExpr;
+    }
+
+    public static boolean isIdentifierNeedToQuote(String sqlExpr, String 
identifier, List<String> identifierPatterns) {
+        if (StringUtils.isBlank(sqlExpr) || StringUtils.isBlank(identifier)) {
+            return false;
+        }
+
+        for (String pattern : identifierPatterns) {
+            if (Pattern.compile(pattern, Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL).matcher(sqlExpr).find()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static Map<String, String> 
buildTableToTableAliasMap(IJoinedFlatTableDesc flatDesc) {
+        Map<String, String> map = Maps.newHashMap();
+        List<TblColRef> colRefs = flatDesc.getAllColumns();
+        for (TblColRef colRef : colRefs) {
+            String tableName = colRef.getTableRef().getTableName();
+            String alias = colRef.getTableAlias();
+            map.put(tableName, alias);
+        }
+        return map;
+    }
+
+    private static Map<String, Map<String, String>> 
buildTableToColumnsMap(IJoinedFlatTableDesc flatDesc) {
+        Map<String, Map<String, String>> map = Maps.newHashMap();
+        List<TblColRef> colRefs = flatDesc.getAllColumns();
+        for (TblColRef colRef : colRefs) {
+            String colName = colRef.getName();
+            String tableName = colRef.getTableRef().getTableName();
+            String colAlias = colRef.getTableAlias() + "_" + colRef.getName();
+            if (map.containsKey(tableName)) {
+                map.get(tableName).put(colName, colAlias);
+            } else {
+                Map<String, String> colToAliasMap = Maps.newHashMap();
+                colToAliasMap.put(colName, colAlias);
+                map.put(tableName, colToAliasMap);
+            }
+        }
+        return map;
+    }
+
+    private static Map<String, String> getColToColAliasMapInTable(String 
tableName,
+                                                                  Map<String, 
Map<String, String>> tableToColumnsMap) {
+        if (tableToColumnsMap.containsKey(tableName)) {
+            return tableToColumnsMap.get(tableName);
+        }
+        return Maps.newHashMap();
+    }
+
+    private static Set<String> listColumnsInTable(String tableName,
+                                                  Map<String, Map<String, 
String>> tableToColumnsMap) {
+        Map<String, String> colToAliasMap = 
getColToColAliasMapInTable(tableName, tableToColumnsMap);
+        return colToAliasMap.keySet();
+    }
+
+    private static boolean columnHasAlias(String tableName, String columnName,
+                                          Map<String, Map<String, String>> 
tableToColumnsMap) {
+        Map<String, String> colToAliasMap = 
getColToColAliasMapInTable(tableName, tableToColumnsMap);
+        if (colToAliasMap.containsKey(columnName)) {
+            return true;
+        }
+        return false;
+    }
+
+    private static String getColumnAlias(String tableName, String columnName,
+                                         Map<String, Map<String, String>> 
tableToColumnsMap) {
+        Map<String, String> colToAliasMap = 
getColToColAliasMapInTable(tableName, tableToColumnsMap);
+        if (colToAliasMap.containsKey(columnName)) {
+            return colToAliasMap.get(columnName);
+        }
+        return null;
+    }
+}
\ No newline at end of file
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtilsTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtilsTest.java
new file mode 100644
index 0000000000..f40971c66b
--- /dev/null
+++ 
b/core-job/src/test/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class FlatTableSqlQuoteUtilsTest {
+
+    @Test
+    public void testQuoteTableName() {
+        List<String> tablePatterns = 
FlatTableSqlQuoteUtils.getTableNameOrAliasPatterns("KYLIN_SALES");
+        String expr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String expectedExpr = "`KYLIN_SALES`.PRICE * `KYLIN_SALES`.COUNT";
+        String quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "`KYLIN_SALES`.PRICE * KYLIN_SALES.COUNT";
+        expectedExpr = "`KYLIN_SALES`.PRICE * `KYLIN_SALES`.COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS 
KYLIN_SALES_COUNT";
+        expectedExpr = "`KYLIN_SALES`.PRICE AS KYLIN_SALES_PRICE * 
`KYLIN_SALES`.COUNT AS KYLIN_SALES_COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE > 1 and 
KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        expectedExpr = "(`KYLIN_SALES`.PRICE AS KYLIN_SALES_PRICE > 1 and 
`KYLIN_SALES`.COUNT AS KYLIN_SALES_COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+    }
+
+    @Test
+    public void testQuoteTableAliasName() {
+        List<String> tablePatterns = 
FlatTableSqlQuoteUtils.getTableNameOrAliasPatterns("KYLIN_SALES_ALIAS");
+        String expr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String expectedExpr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS 
KYLIN_SALES_COUNT";
+        expectedExpr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * 
KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE > 1 and 
KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        expectedExpr = "(KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE > 1 and 
KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(KYLIN_SALES_ALIAS.PRICE AS KYLIN_SALES_PRICE > 1 and 
KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        expectedExpr = "(`KYLIN_SALES_ALIAS`.PRICE AS KYLIN_SALES_PRICE > 1 
and KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+    }
+
+    @Test
+    public void testQuoteColumnName() {
+        List<String> columnPatterns = 
FlatTableSqlQuoteUtils.getColumnNameOrAliasPatterns("PRICE");
+        String expr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String expectedExpr = "KYLIN_SALES.`PRICE` * KYLIN_SALES.COUNT";
+        String quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE/KYLIN_SALES.COUNT";
+        expectedExpr = "KYLIN_SALES.`PRICE`/KYLIN_SALES.COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS 
KYLIN_SALES_COUNT";
+        expectedExpr = "KYLIN_SALES.`PRICE` AS KYLIN_SALES_PRICE * 
KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(PRICE > 1 AND COUNT > 50)";
+        expectedExpr = "(`PRICE` > 1 AND COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "PRICE>1 and `PRICE` < 15";
+        expectedExpr = "`PRICE`>1 and `PRICE` < 15";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", 
"PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+    }
+
+    @Test
+    public void testIsTableNameOrAliasNeedToQuote() {
+        List<String> tablePatterns = 
FlatTableSqlQuoteUtils.getTableNameOrAliasPatterns("kylin_sales");
+        
Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES.PRICE
 * KYLIN_SALES.COUNT",
+                "kylin_sales", tablePatterns));
+        
Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES.PRICE*KYLIN_SALES.COUNT",
+                "kylin_sales", tablePatterns));
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS 
KYLIN_SALES_COUNT", "kylin_sales",
+                tablePatterns));
+        Assert.assertTrue(
+                
FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES.PRICE>1", 
"kylin_sales", tablePatterns));
+        
Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("(KYLIN_SALES.PRICE
 * KYLIN_SALES.COUNT)",
+                "kylin_sales", tablePatterns));
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "`KYLIN_SALES`.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT 
AS KYLIN_SALES_COUNT", "kylin_sales",
+                tablePatterns));
+
+        
Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("`KYLIN_SALES`.PRICE
 * `KYLIN_SALES`.COUNT",
+                "kylin_sales", tablePatterns));
+        Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "\"KYLIN_SALES\".PRICE * \"KYLIN_SALES\".COUNT", 
"kylin_sales", tablePatterns));
+        Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "\'KYLIN_SALES\'.PRICE * \'KYLIN_SALES\'.COUNT", 
"kylin_sales", tablePatterns));
+        
Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES_PRICE
 * KYLIN_SALES_COUNT",
+                "kylin_sales", tablePatterns));
+    }
+}
\ No newline at end of file
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 6bbc70c96c..c603098325 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -219,6 +219,18 @@ public String getIdentity() {
         return identity;
     }
 
+    public String getIdentityQuoted(String quot) {
+        String dbName = quot + this.getDatabase() + quot;
+        String tableName = quot + this.getName() + quot;
+        return String.format(Locale.ROOT, "%s.%s", dbName, 
tableName).toUpperCase(Locale.ROOT);
+    }
+
+    public String getFactTableQuoted(String quot) {
+        String database = quot + config.getHiveDatabaseForIntermediateTable() 
+ quot;
+        String table = quot + this.getName() + "_fact" + quot;
+        return database + "." + table;
+    }
+
     public boolean isView() {
         return TABLE_TYPE_VIRTUAL_VIEW.equals(tableType);
     }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index 7f0e09c6be..21eb2fd4ec 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -68,6 +68,10 @@ public String getTableIdentity() {
         return table.getIdentity();
     }
 
+    public String getTableIdentityQuoted(String quotation) {
+        return table.getIdentityQuoted(quotation);
+    }
+
     public TblColRef getColumn(String name) {
         return columns.get(name);
     }
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
index 3669a32da1..3d251d02de 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
@@ -37,21 +37,30 @@
 import org.apache.kylin.sdk.datasource.framework.def.DataSourceDefProvider;
 
 import com.google.common.cache.Cache;
+import com.google.common.base.Joiner;
 import com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Extends this Abstract class to create Adaptors for new jdbc data source.
  */
 public abstract class AbstractJdbcAdaptor implements Closeable {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractJdbcAdaptor.class);
     protected final BasicDataSource dataSource;
     protected final AdaptorConfig config;
     protected final DataSourceDef dataSourceDef;
     protected SqlConverter.IConfigurer configurer;
+    protected final Cache<String, List<String>> columnsCache = 
CacheBuilder.newBuilder()
+            .expireAfterWrite(1, TimeUnit.DAYS).maximumSize(30).build();
     protected final Cache<String, List<String>> databasesCache = 
CacheBuilder.newBuilder()
             .expireAfterWrite(1, TimeUnit.DAYS).maximumSize(30).build();
     protected final Cache<String, List<String>> tablesCache = 
CacheBuilder.newBuilder()
             .expireAfterWrite(1, TimeUnit.DAYS).maximumSize(30).build();
 
+    private static Joiner joiner = Joiner.on("_");
+
     /**
      * Default constructor method.
      * @param config Basic configuration of JDBC source, such as driver name, 
URL, username, password.
@@ -267,11 +276,11 @@ public String getDataSourceId() {
     public abstract String fixSql(String sql);
 
     /**
-     * fix case sensitive
-     * @param sql
+     * fix case sensitive for identifier
+     * @param identifier
      * @return
      */
-    public abstract String fixCaseSensitiveSql(String sql);
+    public abstract String fixIdentifierCaseSensitve(String identifier);
 
     /**
      * To list all the available database names from JDBC source.
@@ -288,10 +297,20 @@ public String getDataSourceId() {
      * @throws SQLException
      */
     public List<String> listDatabasesWithCache() throws SQLException {
+        return listDatabasesWithCache(false);
+    }
+
+    /**
+     * list databases with cache
+     * @param init
+     * @return
+     * @throws SQLException
+     */
+    public List<String> listDatabasesWithCache(boolean init) throws 
SQLException {
         if (configurer.enableCache()) {
             String cacheKey = config.datasourceId + config.url + "_databases";
-            List<String> cachedDatabases = 
databasesCache.getIfPresent(cacheKey);
-            if (cachedDatabases == null) {
+            List<String> cachedDatabases;
+            if (init || (cachedDatabases = 
databasesCache.getIfPresent(cacheKey)) == null) {
                 cachedDatabases = listDatabases();
                 databasesCache.put(cacheKey, cachedDatabases);
             }
@@ -312,14 +331,15 @@ public String getDataSourceId() {
     /**
      * list tables with cache
      * @param database
+     * @param init
      * @return
      * @throws SQLException
      */
-    public List<String> listTablesWithCache(String database) throws 
SQLException{
+    public List<String> listTablesWithCache(String database, boolean init) 
throws SQLException {
         if (configurer.enableCache()) {
-            String cacheKey = config.datasourceId + config.url + "_tables";
-            List<String> cachedTables = tablesCache.getIfPresent(cacheKey);
-            if (cachedTables == null) {
+            String cacheKey = joiner.join(config.datasourceId, config.url, 
database, "tables");
+            List<String> cachedTables;
+            if (init || (cachedTables = tablesCache.getIfPresent(cacheKey)) == 
null) {
                 cachedTables = listTables(database);
                 tablesCache.put(cacheKey, cachedTables);
             }
@@ -328,6 +348,10 @@ public String getDataSourceId() {
         return listTables(database);
     }
 
+    public List<String> listTablesWithCache(String database) throws 
SQLException {
+        return listTablesWithCache(database, false);
+    }
+
     /**
      * To get the metadata in form of <C>javax.sql.rowset.CachedRowSet</C> for 
a table inside a database.
      * @param database The given database name
@@ -376,5 +400,49 @@ public String getDataSourceId() {
      * @return A set of SQL Statements which can be executed in JDBC source.
      */
     public abstract String[] buildSqlToCreateView(String viewName, String sql);
+
+    /**
+     * To list all the available columns inside a table in database from JDBC 
source.
+     * Developers can overwrite this method to do some filtering work.
+     * @param database The given database.
+     * @param tableName The given table name
+     * @return The list of all the available columns of a table.
+     * @throws SQLException If metadata fetch failed.
+     */
+    public abstract List<String> listColumns(String database, String 
tableName) throws SQLException;
+
+    /**
+     * list columns with cache
+     * @param database
+     * @return
+     * @throws SQLException
+     */
+    public List<String> listColumnsWithCache(String database, String 
tableName) throws SQLException {
+        return listColumnsWithCache(database, tableName, false);
+    }
+
+    /**
+     * list columns with cache
+     * @param database
+     * @return
+     * @throws SQLException
+     */
+    public List<String> listColumnsWithCache(String database, String 
tableName, boolean init) throws SQLException {
+        if (configurer.enableCache()) {
+            String cacheKey = config.datasourceId + config.url + "_" + 
tableName + "_columns";
+            List<String> cachedColumns;
+            if (init || (cachedColumns = columnsCache.getIfPresent(cacheKey)) 
== null) {
+                cachedColumns = listColumns(database, tableName);
+                columnsCache.put(cacheKey, cachedColumns);
+            }
+            return cachedColumns;
+        }
+        return listColumns(database, tableName);
+
+    }
+
+    public boolean isCaseSensitive() {
+        return configurer.isCaseSensitive();
+    }
 }
 
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
index 3a14a1de05..3e5a7661d4 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
@@ -27,15 +27,18 @@
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-
 import javax.sql.rowset.CachedRowSet;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * A default implementation for <C>AbstractJdbcAdaptor</C>. By default, this 
adaptor supposed to support most cases.
  * Developers can just extends this class and modify some methods if found 
somewhere unsupported.
  */
 public class DefaultAdaptor extends AbstractJdbcAdaptor {
 
+    protected static final String QUOTE_REG_LFT = "[`\"\\[]*";
+    protected static final String QUOTE_REG_RHT = "[`\"\\]]*";
     private final static String [] POSSIBLE_TALBE_END= {",", " ", ")", "\r", 
"\n", "."};
 
     public DefaultAdaptor(AdaptorConfig config) throws Exception {
@@ -138,36 +141,6 @@ public String fixSql(String sql) {
         return sql;
     }
 
-    /**
-     * All known defects:
-     * Can not support one database has two toUppercase-same tables (e.g. 
ACCOUNT and account table can't coexist in one database)
-     * @param sql The SQL statement to be fixed.
-     * @return The changed sql
-     */
-    @Override
-    public String fixCaseSensitiveSql(String sql) {
-        try {
-            String orig = sql.toUpperCase(Locale.ROOT);
-            List<String> databases = listDatabasesWithCache();
-            String category = "";
-            for (String c : databases) {
-                if 
(orig.contains(c.toUpperCase(Locale.ROOT)+".")||orig.contains(c.toUpperCase(Locale.ROOT)+'"'))
 {
-                    sql = sql.replaceAll(c.toUpperCase(Locale.ROOT), c);
-                    category = c;
-                }
-            }
-            List<String> tables = listTables(category);
-            for (String table : tables) {
-                if(checkSqlContainstable(orig, table)) {
-                    sql = sql.replaceAll("(?i)" + table, table);// use (?i) to 
matchIgnoreCase
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        return sql;
-    }
-
     private boolean checkSqlContainstable(String orig, String table) {
         // ensure table is single match(e.g match account but not match 
accountant)
         if (orig.endsWith(table.toUpperCase(Locale.ROOT))) {
@@ -192,8 +165,9 @@ private boolean checkSqlContainstable(String orig, String 
table) {
         try (Connection con = getConnection(); ResultSet rs = 
con.getMetaData().getSchemas()) {
             while (rs.next()) {
                 String schema = rs.getString("TABLE_SCHEM");
-                if (schema != null && !schema.isEmpty())
+                if (StringUtils.isNotBlank(schema)) {
                     ret.add(schema);
+                }
             }
         }
         return ret;
@@ -211,8 +185,22 @@ private boolean checkSqlContainstable(String orig, String 
table) {
         try (Connection conn = getConnection(); ResultSet rs = 
conn.getMetaData().getTables(null, schema, null, null)) {
             while (rs.next()) {
                 String name = rs.getString("TABLE_NAME");
-                if (name != null && !name.isEmpty())
+                if (StringUtils.isNotBlank(schema)) {
                     ret.add(name);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public List<String> listColumns(String database, String tableName) throws 
SQLException {
+        List<String> ret = new ArrayList<>();
+        CachedRowSet columnsRs = getTableColumns(database, tableName);
+        while (columnsRs.next()) {
+            String name = columnsRs.getString("COLUMN_NAME");
+            if (StringUtils.isNotBlank(name)) {
+                ret.add(name);
             }
         }
         return ret;
@@ -272,4 +260,37 @@ public CachedRowSet getTableColumns(String schema, String 
table) throws SQLExcep
 
         return new String[] { dropView, dropTable, createSql };
     }
+
+    /**
+     * defects:
+     * identifier can not tell column or table or database, here follow the 
order database->table->column, once matched and returns
+     * so once having a database name Test and table name TEst, will always 
find Test.
+     * @param identifier
+     * @return identifier with case sensitive
+     */
+    public String fixIdentifierCaseSensitve(String identifier) {
+        try {
+            List<String> databases = listDatabasesWithCache();
+            for (String db : databases) {
+                if (db.equalsIgnoreCase(identifier)) {
+                    return db;
+                }
+                List<String> tables = listTablesWithCache(db);
+                for (String table : tables) {
+                    if (table.equalsIgnoreCase(identifier)) {
+                        return table;
+                    }
+                    List<String> cols = listColumnsWithCache(db, table);
+                    for (String col : cols) {
+                        if (col.equalsIgnoreCase(identifier)) {
+                            return col;
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return identifier;
+    }
 }
\ No newline at end of file
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
index b0c0f5f164..d849e6c010 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
@@ -85,8 +85,8 @@ public String convertSql(String orig) {
         return sqlConverter.convertSql(orig);
     }
 
-    public String convertColumn(String column) {
-        return sqlConverter.convertColumn(column);
+    public String convertColumn(String column, String originQuote) {
+        return sqlConverter.convertColumn(column, originQuote);
     }
 
     /**
@@ -117,7 +117,7 @@ public void executeUpdate(String[] sqls) throws 
SQLException {
     }
 
     public List<String> listDatabases() throws SQLException {
-        List<String> dbNames = adaptor.listDatabasesWithCache();
+        List<String> dbNames = adaptor.listDatabasesWithCache(true);
         String blackList = 
jdbcDs.getPropertyValue("schema.database.black-list-pattern");
         if (!StringUtils.isEmpty(blackList)) {
             String[] patterns = blackList.split(",");
@@ -136,7 +136,7 @@ public void executeUpdate(String[] sqls) throws 
SQLException {
     }
 
     public List<String> listTables(String schema) throws SQLException {
-        return adaptor.listTablesWithCache(schema);
+        return adaptor.listTablesWithCache(schema, true);
     }
 
     public CachedRowSet getTable(String database, String table) throws 
SQLException {
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
index 583dc7296e..ce21500494 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
@@ -55,3 +55,5 @@ private static String decideAdaptorClassName(String 
dataSourceId) {
         }
     }
 }
+
+
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
index 42fb9f1cf9..3935e1324c 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
@@ -144,12 +144,22 @@ private void doWriteLimitOffset(SqlNode fetch, SqlNode 
offset) {
 
     @Override
     public void identifier(String name) {
-        if (!configurer.skipHandleDefault() && 
name.trim().equalsIgnoreCase("default")) {
-            String quoted = getDialect().quoteIdentifier(name);
+        String convertName = name;
+        if (configurer.isCaseSensitive()) {
+            convertName = configurer.fixIdentifierCaseSensitve(name);
+        }
+        if (configurer.enableQuote()) {
+            String quoted = getDialect().quoteIdentifier(convertName);
             print(quoted);
             setNeedWhitespace(true);
         } else {
-            super.identifier(name);
+            if (!configurer.skipHandleDefault() && 
convertName.trim().equalsIgnoreCase("default")) {
+                String quoted = getDialect().quoteIdentifier(convertName);
+                print(quoted);
+                setNeedWhitespace(true);
+            } else {
+                super.identifier(convertName);
+            }
         }
     }
 
@@ -190,6 +200,11 @@ public boolean inQuery() {
                 || this.frame.getFrameType() == FrameTypeEnum.WITH_ITEM;
     }
 
+    @Override
+    public boolean isQuoteAllIdentifiers() {
+        return super.isQuoteAllIdentifiers();
+    }
+
     @Override
     public void writeWith(SqlCall call, int leftPrec, int rightPrec) {
         final SqlWith with = (SqlWith) call;
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
index 94c25262b3..6d7fb6da37 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.kylin.sdk.datasource.framework.conv;
 
-import java.sql.Connection;
-import java.sql.SQLException;
 import java.util.Locale;
 import java.util.Map;
 
@@ -80,23 +78,18 @@ public String fixAfterDefaultConvert(String orig) {
         if (this.adaptor == null) {
             return orig;
         }
-        if (isCaseSensitive()) {
-            orig = adaptor.fixCaseSensitiveSql(orig);
-        }
+        // fix problem of case sensitive when generate sql.
+//        if (isCaseSensitive()) {
+//            orig = adaptor.fixCaseSensitiveSql(orig);
+//        }
         return adaptor.fixSql(orig);
     }
 
     @Override
-    public SqlDialect getSqlDialect() throws SQLException {
-        if (adaptor != null) {
-            try (Connection conn = this.adaptor.getConnection()) {
-                return SqlDialect.create(conn.getMetaData());
-            }
-        } else {
-            String dialectName = dsDef.getDialectName() == null ? 
dsDef.getId() : dsDef.getDialectName();
-            SqlDialect sqlDialect = 
sqlDialectMap.get(dialectName.toLowerCase(Locale.ROOT));
-            return sqlDialect == null ? sqlDialectMap.get("unkown") : 
sqlDialect;
-        }
+    public SqlDialect getSqlDialect() {
+        String dialectName = dsDef.getDialectName() == null ? dsDef.getId() : 
dsDef.getDialectName();
+        SqlDialect sqlDialect = 
sqlDialectMap.get(dialectName.toLowerCase(Locale.ROOT));
+        return sqlDialect == null ? sqlDialectMap.get("unkown") : sqlDialect;
     }
 
     @Override
@@ -121,11 +114,24 @@ public String getPagingType() {
 
     @Override
     public boolean isCaseSensitive() {
-        return 
"true".equalsIgnoreCase(dsDef.getPropertyValue("sql.case-sensitive", "false"));
+        return 
"true".equalsIgnoreCase(dsDef.getPropertyValue("sql.case-sensitive", "true"));
     }
 
     @Override
     public boolean enableCache() {
-        return 
"true".equalsIgnoreCase(dsDef.getPropertyValue("metadata.enable-cache", 
"false"));
+        return 
"true".equalsIgnoreCase(dsDef.getPropertyValue("metadata.enable-cache", 
"true"));
+    }
+
+    @Override
+    public boolean enableQuote() {
+        return 
"true".equalsIgnoreCase(dsDef.getPropertyValue("sql.enable-quote-all-identifiers",
 "true"));
+    }
+
+    @Override
+    public String fixIdentifierCaseSensitve(String orig) {
+        if (this.adaptor == null || !isCaseSensitive()) {
+            return orig;
+        }
+        return adaptor.fixIdentifierCaseSensitve(orig);
     }
 }
diff --git 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
index a055bd3200..d25c04fd61 100644
--- 
a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
+++ 
b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
@@ -41,7 +41,9 @@ public SqlConverter(IConfigurer configurer, ConvMaster 
convMaster) throws SQLExc
     }
 
     public String convertSql(String orig) {
-        String converted = orig;
+        // for jdbc source, convert quote from backtick to double quote
+        String converted = orig.replaceAll("`", "\"");
+
         if (!configurer.skipHandleDefault()) {
             String escapedDefault = SqlDialect.CALCITE
                     .quoteIdentifier(configurer.useUppercaseDefault() ? 
"DEFAULT" : "default");
@@ -64,11 +66,18 @@ public String convertSql(String orig) {
         return converted;
     }
 
-    public String convertColumn(String column) {
-        if (configurer.isCaseSensitive()) {
-            return configurer.fixAfterDefaultConvert(column);
+    public String convertColumn(String column, String originQuote) {
+        String converted = column.replace(originQuote, "");
+        try {
+            SqlNode sqlNode = SqlParser.create(converted).parseExpression();
+            sqlNode = sqlNode.accept(sqlNodeConverter);
+            converted = sqlWriter.format(sqlNode);
+        } catch (Throwable e) {
+            logger.error("Failed to default convert Column, will use the 
input: {}", column, e);
+        } finally {
+            sqlWriter.reset();
         }
-        return column;
+        return converted;
     }
 
     public IConfigurer getConfigurer() {
@@ -76,15 +85,15 @@ public IConfigurer getConfigurer() {
     }
 
     public interface IConfigurer {
-        public boolean skipDefaultConvert();
+        boolean skipDefaultConvert();
 
-        public boolean skipHandleDefault();
+        boolean skipHandleDefault();
 
-        public boolean useUppercaseDefault();
+        boolean useUppercaseDefault();
 
-        public String fixAfterDefaultConvert(String orig);
+        String fixAfterDefaultConvert(String orig);
 
-        public SqlDialect getSqlDialect() throws SQLException;
+        SqlDialect getSqlDialect() throws SQLException;
 
         boolean allowNoOffset();
 
@@ -97,5 +106,9 @@ public IConfigurer getConfigurer() {
         boolean isCaseSensitive();
 
         boolean enableCache();
+
+        boolean enableQuote();
+
+        String fixIdentifierCaseSensitve(String orig);
     }
 }
diff --git 
a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
 
b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
index 324ad918ce..7120c1a614 100644
--- 
a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
+++ 
b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
@@ -31,26 +31,33 @@ public void testConvertSql() throws SQLException {
         GenericSqlConverter sqlConverter = new GenericSqlConverter();
         // test function
         List<String> functionTestSqls = new LinkedList<>();
-        functionTestSqls.add("SELECT MIN(C1)\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT EXP(AVG(LN(EXTRACT(DOY FROM 
CAST('2018-03-20' AS DATE)))))\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT CASE WHEN SUM(C1 - C1 + 1) = 1 THEN 0 
ELSE (SUM(C1 * C1) - SUM(C1) * SUM(C1) / SUM(C1 - C1 + 1)) / (SUM(C1 - C1 + 1) 
- 1) END\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT EXTRACT(DAY FROM CAST('2018-03-20' AS 
DATE))\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT FIRST_VALUE(C1) OVER (ORDER BY C1)\nFROM 
TEST_SUITE");
-        functionTestSqls.add("SELECT SUBSTR('world', 1, CAST(2 AS 
INTEGER))\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT 2 - TRUNC(2 / NULLIF(3, 0)) * 3\nFROM 
TEST_SUITE");
-        functionTestSqls.add("SELECT CASE WHEN SUBSTRING('hello' FROM 
CAST(LENGTH('llo') - LENGTH('llo') + 1 AS INTEGER) FOR CAST(LENGTH('llo') AS 
INTEGER)) = 'llo' THEN 1 ELSE 0 END\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT SUBSTRING('world' FROM 
CAST(LENGTH('world') - 3 + 1 AS INTEGER) FOR CAST(3 AS INTEGER))\nFROM 
TEST_SUITE");
+        functionTestSqls.add("SELECT MIN(\"C1\")\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT EXP(AVG(LN(EXTRACT(DOY FROM 
CAST('2018-03-20' AS DATE)))))\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT CASE WHEN SUM(\"C1\" - \"C1\" + 1) = 1 
THEN 0 ELSE (SUM(\"C1\" * \"C1\") - SUM(\"C1\") * SUM(\"C1\") / SUM(\"C1\" - 
\"C1\" + 1)) / (SUM(\"C1\" - \"C1\" + 1) - 1) END\n" +
+                "FROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT EXTRACT(DAY FROM CAST('2018-03-20' AS 
DATE))\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT FIRST_VALUE(\"C1\") OVER (ORDER BY 
\"C1\")\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT SUBSTR('world', 1, CAST(2 AS 
INTEGER))\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT 2 - TRUNC(2 / NULLIF(3, 0)) * 3\nFROM 
\"TEST_SUITE\"");
+        functionTestSqls.add("SELECT CASE WHEN SUBSTRING('hello' FROM 
CAST(LENGTH('llo') - LENGTH('llo') + 1 AS INTEGER) FOR CAST(LENGTH('llo') AS 
INTEGER)) = 'llo' THEN 1 ELSE 0 END\n" +
+                "FROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT SUBSTRING('world' FROM 
CAST(LENGTH('world') - 3 + 1 AS INTEGER) FOR CAST(3 AS INTEGER))\n" +
+                "FROM \"TEST_SUITE\"");
 
         for (String originSql : functionTestSqls) {
             testSqlConvert(originSql, "testing", "default", sqlConverter);
         }
         // test datatype
         List<String> typeTestSqls = new LinkedList<>();
-        typeTestSqls.add("SELECT CAST(PRICE AS DOUBLE PRECISION)\nFROM 
\"default\".FACT");
-        typeTestSqls.add("SELECT CAST(PRICE AS DECIMAL(19, 4))\nFROM 
\"default\".FACT");
-        typeTestSqls.add("SELECT CAST(PRICE AS DECIMAL(19))\nFROM 
\"default\".FACT");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DOUBLE PRECISION)\n" +
+                "FROM \"default\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DECIMAL(19, 4))\n" +
+                "FROM \"default\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DECIMAL(19))\n" +
+                "FROM \"default\".\"FACT\"");
         typeTestSqls.add("SELECT CAST(BYTE AS BIT(8))\nFROM \"default\".FACT");
-        typeTestSqls.add("SELECT CAST(BYTE AS VARCHAR(1024))\nFROM 
\"default\".FACT");
+        typeTestSqls.add("SELECT CAST(\"BYTE\" AS VARCHAR(1024))\n" +
+                "FROM \"default\".\"FACT\"");
         for (String originSql : typeTestSqls) {
             testSqlConvert(originSql, "testing", "default", sqlConverter);
         }
diff --git 
a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
 
b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
index 0224ce45a9..94cc651223 100644
--- 
a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
+++ 
b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.sdk.datasource.framework.conv;
 
 import java.sql.SQLException;
+import java.util.Locale;
 
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -67,7 +68,7 @@ public String fixAfterDefaultConvert(String orig) {
             }
 
             @Override
-            public SqlDialect getSqlDialect() throws SQLException {
+            public SqlDialect getSqlDialect() {
                 return SqlDialect.CALCITE;
             }
 
@@ -100,12 +101,22 @@ public boolean isCaseSensitive() {
             public boolean enableCache() {
                 return true;
             }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
         }, master);
 
         // escape default keywords
-        Assert.assertEquals("SELECT *\nFROM DEFAULT.FACT", 
converter.convertSql("select * from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"Default\".FACT", 
converter.convertSql("select * from \"Default\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"default\".FACT", 
converter.convertSql("select * from \"default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", 
converter.convertSql("select * from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", 
converter.convertSql("select * from \"Default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", 
converter.convertSql("select * from \"default\".FACT"));
     }
 
     @Test
@@ -135,7 +146,7 @@ public String fixAfterDefaultConvert(String orig) {
             }
 
             @Override
-            public SqlDialect getSqlDialect() throws SQLException {
+            public SqlDialect getSqlDialect() {
                 return SqlDialect.CALCITE;
             }
 
@@ -168,11 +179,21 @@ public boolean isCaseSensitive() {
             public boolean enableCache() {
                 return true;
             }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
         }, master);
 
         // normal cases
         Assert.assertEquals("SELECT 1", converter.convertSql("select     1"));
-        Assert.assertEquals("SELECT *\nFROM FACT", 
converter.convertSql("select * from FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"FACT\"", 
converter.convertSql("select * from FACT"));
 
         // limit and offset
         Assert.assertEquals("SELECT 1\nFETCH NEXT 1 ROWS ONLY", 
converter.convertSql("SELECT 1 LIMIT 1"));
@@ -181,56 +202,56 @@ public boolean enableCache() {
                 converter.convertSql("SELECT 1 LIMIT 1 OFFSET 1"));
 
         // escape default keywords
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", 
converter.convertSql("select * from DEFAULT.FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", 
converter.convertSql("select * from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", 
converter.convertSql("select * from \"Default\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", 
converter.convertSql("select * from \"default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", 
converter.convertSql("select * from DEFAULT.FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", 
converter.convertSql("select * from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", 
converter.convertSql("select * from \"Default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", 
converter.convertSql("select * from \"default\".FACT"));
 
         // function mapping
-        Assert.assertEquals("SELECT EXTRACT(DOY FROM PART_DT)\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT EXTRACT(DOY FROM \"PART_DT\")\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select DAYOFYEAR(PART_DT) from 
\"DEFAULT\".FACT"));
         Assert.assertEquals(
-                "SELECT 12 * (EXTRACT(YEAR FROM DT1) - EXTRACT(YEAR FROM DT2)) 
+ EXTRACT(MONTH FROM DT1) - EXTRACT(MONTH FROM DT2) "
-                        + "- CASE WHEN EXTRACT(DAY FROM DT2) > EXTRACT(DAY 
FROM DT1) THEN 1 ELSE 0 END\n"
-                        + "FROM \"DEFAULT\".FACT",
+                "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM 
\"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " +
+                        "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY 
FROM \"DT1\") THEN 1 ELSE 0 END\n" +
+                        "FROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select TIMESTAMPDIFF(month,DT2,      
DT1) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT TRUNC(ID)\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT TRUNC(\"ID\")\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(ID as INT) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT 1\nFROM A\nWHERE 1 BETWEEN ASYMMETRIC 0 
AND 2",
+        Assert.assertEquals("SELECT 1\nFROM \"A\"\nWHERE 1 BETWEEN ASYMMETRIC 
0 AND 2",
                 converter.convertSql("select 1 from a where 1 BETWEEN 0 and 
2"));
-        Assert.assertEquals("SELECT CURRENT_DATE, TEST_CURR_TIME()",
+        Assert.assertEquals("SELECT \"CURRENT_DATE\", TEST_CURR_TIME()",
                 converter.convertSql("select CURRENT_DATE, CURRENT_TIME"));
-        Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM 
CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM 
CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql(
                         "select exp(avg(ln(dayofyear(cast('2018-03-20' as 
date))))) from \"DEFAULT\".FACT"));
 
         // over function
-        Assert.assertEquals("SELECT STDDEVP(C1) OVER (ORDER BY C1)\nFROM 
TEST_SUITE\nFETCH NEXT 1 ROWS ONLY",
+        Assert.assertEquals("SELECT STDDEVP(\"C1\") OVER (ORDER BY 
\"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("select stddev_pop(c1) over(order by c1) 
from test_suite limit 1"));
 
         // type mapping
-        Assert.assertEquals("SELECT CAST(PRICE AS DOUBLE PRECISION)\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DOUBLE PRECISION)\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DOUBLE) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(PRICE AS DECIMAL(19, 4))\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DECIMAL(19, 4))\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DECIMAL(19,4)) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(PRICE AS DECIMAL(19))\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DECIMAL(19))\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DECIMAL(19)) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(BYTE AS BIT(8))\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"BYTE\" AS BIT(8))\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(BYTE as BYTE) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(BYTE AS VARCHAR(1024))\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"BYTE\" AS VARCHAR(1024))\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(BYTE as VARCHAR(1024)) from 
\"DEFAULT\".FACT"));
 
         // cannot find mapping
-        Assert.assertEquals("SELECT CURRENT_DATE_1, CURRENT_TIME_1",
+        Assert.assertEquals("SELECT \"CURRENT_DATE_1\", \"CURRENT_TIME_1\"",
                 converter.convertSql("select CURRENT_DATE_1, CURRENT_TIME_1"));
-        Assert.assertEquals("SELECT CURRENT_DATE_1, TEST_CURR_TIME(), 
CURRENT_DATE",
+        Assert.assertEquals("SELECT \"CURRENT_DATE_1\", TEST_CURR_TIME(), 
\"CURRENT_DATE\"",
                 converter.convertSql("select CURRENT_DATE_1, CURRENT_TIME, 
CURRENT_DATE"));
-        Assert.assertEquals("SELECT CAST(BYTE AS VAR(1024))\nFROM 
\"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"BYTE\" AS VAR(1024))\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(BYTE as VAR(1024)) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(PRICE AS DDD)\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DDD)\nFROM 
\"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DDD) from 
\"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT A(), B(A), CAST(PRICE AS DDD)\nFROM 
\"DEFAULT\".FACT",
-                converter.convertSql("select A(), B(A), cast(PRICE as DDD) 
from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE\" AS 
DDD)\nFROM \"DEFAULT\".\"FACT\"",
+                converter.convertSql("select A(), B(A), cast(PRICE as DDD) 
from \"DEFAULT\".\"FACT\""));
         Assert.assertEquals("SELECT ONLY_DEFAULT(1)", 
converter.convertSql("SELECT ONLY_DEFAULT(1)"));
 
         // invalid case
@@ -268,7 +289,7 @@ public String fixAfterDefaultConvert(String orig) {
             }
 
             @Override
-            public SqlDialect getSqlDialect() throws SQLException {
+            public SqlDialect getSqlDialect() {
                 return SqlDialect.CALCITE;
             }
 
@@ -301,13 +322,23 @@ public boolean isCaseSensitive() {
             public boolean enableCache() {
                 return true;
             }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
         }, master);
 
         Assert.assertEquals("SELECT 1\nORDER BY 2\nOFFSET 0 ROWS\nFETCH NEXT 1 
ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY 2 LIMIT 1"));
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 
ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY 1 LIMIT 1"));
-        Assert.assertEquals("SELECT 1\nORDER BY COL\nOFFSET 0 ROWS\nFETCH NEXT 
1 ROWS ONLY",
+        Assert.assertEquals("SELECT 1\nORDER BY \"COL\"\nOFFSET 0 ROWS\nFETCH 
NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY COL LIMIT 1"));
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 
ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY 1 LIMIT 0"));
@@ -316,4 +347,165 @@ public boolean enableCache() {
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 
ROWS ONLY",
                 converter.convertSql("SELECT 1 LIMIT 1"));
     }
+    @Test
+    public void testConvertQuotedSqlWithEscape() throws SQLException {
+        DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
+        ConvMaster master = new ConvMaster(provider.getDefault(), 
provider.getById(TEST_TARGET));
+        SqlConverter converter = new SqlConverter(new 
SqlConverter.IConfigurer() {
+
+            @Override
+            public boolean skipDefaultConvert() {
+                return false;
+            }
+
+            @Override
+            public boolean skipHandleDefault() {
+                return false;
+            }
+
+            @Override
+            public boolean useUppercaseDefault() {
+                return true;
+            }
+
+            @Override
+            public String fixAfterDefaultConvert(String orig) {
+                return orig;
+            }
+
+            @Override
+            public SqlDialect getSqlDialect() {
+                return SqlDialect.CALCITE;
+            }
+
+            @Override
+            public boolean allowNoOffset() {
+                return true;
+            }
+
+            @Override
+            public boolean allowFetchNoRows() {
+                return true;
+            }
+
+            @Override
+            public boolean allowNoOrderByWithFetch() {
+                return true;
+            }
+
+            @Override
+            public String getPagingType() {
+                return "AUTO";
+            }
+
+            @Override
+            public boolean isCaseSensitive() {
+                return false;
+            }
+
+            @Override
+            public boolean enableCache() {
+                return true;
+            }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
+        }, master);
+
+        Assert.assertEquals("SELECT SUM(\"A\"), COUNT(\"A\") AS \"AB\"\nFROM 
\"DEFAULT\".\"CUBE\"",
+                converter.convertSql("select sum(A), count(`A`) as AB from 
DEFAULT.`CUBE`"));
+        Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS 
DDD)\nFROM \"DEFAULT\".\"CUBE\"",
+                converter.convertSql("select A(), B(`A`), cast(`PRICE@@` as 
`DDD`) from DEFAULT.`CUBE`"));
+        Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS 
DDD)\nFROM \"DEFAULT\".\"CUBE\"",
+                converter.convertSql("select A(), B(\"A\"), cast(\"PRICE@@\" 
as \"DDD\") from \"DEFAULT\".\"CUBE\""));
+        Assert.assertEquals("SELECT \"kylin_sales\".\"price_@@\", 
\"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE 
\"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
+                converter.convertSql("select `kylin_sales`.`price_@@`, 
`kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` 
> 1 and `kylin_sales`.`count` < 50"));
+        Assert.assertEquals("SELECT COUNT(DISTINCT \"price_#@\")\nFROM 
\"cube\".\"kylin_sales\"",
+                converter.convertSql("select count(distinct `price_#@`) from 
`cube`.`kylin_sales`"));
+
+    }
+
+    @Test
+    public void testConvertColumn() throws SQLException, SqlParseException {
+        DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
+        ConvMaster master = new ConvMaster(provider.getDefault(), 
provider.getById(TEST_TARGET));
+        SqlConverter converter = new SqlConverter(new 
SqlConverter.IConfigurer() {
+
+            @Override
+            public boolean skipDefaultConvert() {
+                return false;
+            }
+
+            @Override
+            public boolean skipHandleDefault() {
+                return false;
+            }
+
+            @Override
+            public boolean useUppercaseDefault() {
+                return true;
+            }
+
+            @Override
+            public String fixAfterDefaultConvert(String orig) {
+                return orig;
+            }
+
+            @Override
+            public SqlDialect getSqlDialect() {
+                return SqlDialect.CALCITE;
+            }
+
+            @Override
+            public boolean allowNoOffset() {
+                return true;
+            }
+
+            @Override
+            public boolean allowFetchNoRows() {
+                return true;
+            }
+
+            @Override
+            public boolean allowNoOrderByWithFetch() {
+                return true;
+            }
+
+            @Override
+            public String getPagingType() {
+                return "AUTO";
+            }
+
+            @Override
+            public boolean isCaseSensitive() {
+                return true;
+            }
+
+            @Override
+            public boolean enableCache() {
+                return true;
+            }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig.toUpperCase(Locale.ROOT);
+            }
+        }, master);
+
+        Assert.assertEquals("\"TEST\".\"AA\"", 
converter.convertColumn("`test`.`aa`", "`"));
+        Assert.assertEquals("\"TEST\".\"AA\"", 
converter.convertColumn("`test`.aa", "`"));
+        Assert.assertEquals("\"TEST\".\"AA\"", 
converter.convertColumn("test.aa", "`"));
+    }
 }
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
index 7dc8260980..ebfaf85862 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
@@ -64,7 +64,7 @@ private String cleanUpIntermediateFlatTable(KylinConfig 
config) throws IOExcepti
             for (String hiveTable : hiveTables) {
                 if (StringUtils.isNotEmpty(hiveTable)) {
                     hiveCmdBuilder.addStatement("USE " + 
config.getHiveDatabaseForIntermediateTable() + ";");
-                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + 
hiveTable + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  `" + 
hiveTable + "`;");
                     output.append("Hive table " + hiveTable + " is dropped. 
\n");
                 }
             }
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 2c998dfb08..c55015b542 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -118,7 +118,7 @@ protected static ShellExecutable 
createLookupHiveViewMaterializationStep(String
         hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
         hiveCmdBuilder.addStatement(hiveInitStatements);
         for (TableDesc lookUpTableDesc : lookupViewsTables) {
-            String identity = lookUpTableDesc.getIdentity();
+            String identity = lookUpTableDesc.getIdentityQuoted("`");
             if (lookUpTableDesc.isView()) {
                 String intermediate = 
lookUpTableDesc.getMaterializedName(uuid);
                 String materializeViewHql = materializeViewHql(intermediate, 
identity, jobWorkingDir);
@@ -134,11 +134,11 @@ protected static ShellExecutable 
createLookupHiveViewMaterializationStep(String
     // each append must be a complete hql.
     protected static String materializeViewHql(String viewName, String 
tableName, String jobWorkingDir) {
         StringBuilder createIntermediateTableHql = new StringBuilder();
-        createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + 
";\n");
-        createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + 
viewName + " LIKE " + tableName
+        createIntermediateTableHql.append("DROP TABLE IF EXISTS `" + viewName 
+ "`;\n");
+        createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS `" + 
viewName + "` LIKE " + tableName
                 + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
-        createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET 
TBLPROPERTIES('auto.purge'='true');\n");
-        createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName 
+ " SELECT * FROM " + tableName + ";\n");
+        createIntermediateTableHql.append("ALTER TABLE `" + viewName + "` SET 
TBLPROPERTIES('auto.purge'='true');\n");
+        createIntermediateTableHql.append("INSERT OVERWRITE TABLE `" + 
viewName + "` SELECT * FROM " + tableName + ";\n");
         return createIntermediateTableHql.toString();
     }
 
diff --git 
a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java 
b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
index 384c2b8ba6..9e0f8297e1 100644
--- 
a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
+++ 
b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
@@ -59,7 +59,7 @@ public void TestGetJobWorkingDir() throws IOException {
     public void testMaterializeViewHql() {
         final int viewSize = 2;
         String[] mockedViewNames = { "mockedView1", "mockedView2" };
-        String[] mockedTalbeNames = { "mockedTable1", "mockedTable2" };
+        String[] mockedTalbeNames = { "`mockedTable1`", "`mockedTable2`" };
         String mockedWorkingDir = "mockedWorkingDir";
 
         StringBuilder hqls = new StringBuilder();
@@ -72,6 +72,16 @@ public void testMaterializeViewHql() {
         for (String sub : hqls.toString().split("\n")) {
             Assert.assertTrue(sub.endsWith(";"));
         }
+
+        Assert.assertEquals("DROP TABLE IF EXISTS `mockedView1`;\n"
+                + "CREATE TABLE IF NOT EXISTS `mockedView1` LIKE 
`mockedTable1` LOCATION 'mockedWorkingDir/mockedView1';\n"
+                + "ALTER TABLE `mockedView1` SET 
TBLPROPERTIES('auto.purge'='true');\n"
+                + "INSERT OVERWRITE TABLE `mockedView1` SELECT * FROM 
`mockedTable1`;\n"
+                + "DROP TABLE IF EXISTS `mockedView2`;\n"
+                + "CREATE TABLE IF NOT EXISTS `mockedView2` LIKE 
`mockedTable2` LOCATION 'mockedWorkingDir/mockedView2';\n"
+                + "ALTER TABLE `mockedView2` SET 
TBLPROPERTIES('auto.purge'='true');\n"
+                + "INSERT OVERWRITE TABLE `mockedView2` SELECT * FROM 
`mockedTable2`;\n",
+                hqls.toString());
     }
 
 }
\ No newline at end of file
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 11eb6f86f6..3460dd2bcf 100644
--- 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -28,6 +28,7 @@
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
@@ -156,11 +157,15 @@ protected AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Str
             TblColRef splitColRef = determineSplitColumn();
             splitTable = splitColRef.getTableRef().getTableName();
             splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = splitColRef.getExpressionInSourceDB();
+            splitColumn = 
JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
             splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase();
 
             //using sqoop to extract data from jdbc source and dump them to 
hive
             String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { 
partCol });
+            selectSql = escapeQuotationInSql(selectSql);
+
+
+
             String hiveTable = flatDesc.getTableName();
             String connectionUrl = config.getJdbcSourceConnectionUrl();
             String driverClass = config.getJdbcSourceDriver();
@@ -178,11 +183,18 @@ protected AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Str
                     if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
                                     
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        bquery += " WHERE " + 
partitionDesc.getPartitionConditionBuilder()
-                                .buildDateRangeCondition(partitionDesc, 
flatDesc.getSegment(), segRange);
+                        String quotedPartCond = 
FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
+                                
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                                        flatDesc.getSegment(), segRange),
+                                "`");
+                        bquery += " WHERE " + quotedPartCond;
                     }
                 }
             }
+            bquery = escapeQuotationInSql(bquery);
+
+            // escape ` in cmd
+            splitColumn = escapeQuotationInSql(splitColumn);
 
             String cmd = String.format(Locale.ROOT,
                     "%s/bin/sqoop import" + generateSqoopConfigArgString()
@@ -217,4 +229,10 @@ protected String generateSqoopConfigArgString() {
             return args.toString();
         }
     }
+
+    protected static String escapeQuotationInSql(String sqlExpr) {
+        sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
+        sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
+        return sqlExpr;
+    }
 }
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
index ff075f7041..2e57a446c8 100644
--- 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
@@ -22,6 +22,7 @@
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
@@ -74,35 +75,41 @@ protected AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Str
             String splitDatabase;
             TblColRef splitColRef = determineSplitColumn();
             splitTable = splitColRef.getTableRef().getTableName();
+            splitTable = splitColRef.getTableRef().getTableDesc().getName();
             splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = splitColRef.getExpressionInSourceDB();
             //to solve case sensitive if necessary
-            splitColumn = dataSource.convertColumn(splitColumn);
+            splitColumn = 
JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
             splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
 
             //using sqoop to extract data from jdbc source and dump them to 
hive
             String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { 
partCol });
-            selectSql = 
StringUtils.escapeString(dataSource.convertSql(selectSql), '\\', '"');
+            selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
 
             String hiveTable = flatDesc.getTableName();
             String sqoopHome = config.getSqoopHome();
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), 
max(%s) FROM \"%s\".%s as %s", splitColumn, splitColumn,
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), 
max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
                     splitDatabase, splitTable, splitTableAlias);
+            bquery = dataSource.convertSql(bquery);
             if (partitionDesc.isPartitioned()) {
                 SegmentRange segRange = flatDesc.getSegRange();
                 if (segRange != null && !segRange.isInfinite()) {
                     if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
                             
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        bquery += " WHERE " + 
partitionDesc.getPartitionConditionBuilder()
-                                .buildDateRangeCondition(partitionDesc, 
flatDesc.getSegment(), segRange);
+                        String quotedPartCond = 
FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
+                                
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                                        flatDesc.getSegment(), segRange),
+                                "`");
+                        bquery += " WHERE " + quotedPartCond;
                     }
                 }
             }
-            bquery = StringUtils.escapeString(dataSource.convertSql(bquery), 
'\\', '"');
+            bquery = escapeQuotationInSql(bquery);
+
+            splitColumn = 
escapeQuotationInSql(dataSource.convertColumn(splitColumn, 
FlatTableSqlQuoteUtils.QUOTE));
 
             String cmd = StringUtils.format(
                     "--connect \"%s\" --driver %s --username %s --password %s 
--query \"%s AND \\$CONDITIONS\" "
diff --git 
a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
 
b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
index 7f104321a7..956f86c397 100644
--- 
a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
+++ 
b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
@@ -70,10 +70,8 @@ public void testGenSqoopCmd_Partition() throws IOException {
         String cmd = executable.getParam("cmd");
         Assert.assertTrue(cmd.contains("org.h2.Driver"));
         Assert.assertTrue(cmd.contains(
-                "--boundary-query \"SELECT MIN(TEST_KYLIN_FACT.LEAF_CATEG_ID), 
MAX(TEST_KYLIN_FACT.LEAF_CATEG_ID)\n"
-                        + "FROM \\\"DEFAULT\\\".TEST_KYLIN_FACT AS 
TEST_KYLIN_FACT\n"
-                        + "WHERE TEST_KYLIN_FACT.CAL_DT >="));
-
+                "--boundary-query \"SELECT 
MIN(\\\"TEST_KYLIN_FACT\\\".\\\"LEAF_CATEG_ID\\\"), 
MAX(\\\"TEST_KYLIN_FACT\\\".\\\"LEAF_CATEG_ID\\\")\n"
+                        + "FROM \\\"DEFAULT\\\".\\\"TEST_KYLIN_FACT\\\" AS 
\\\"TEST_KYLIN_FACT\\\""));
         source.close();
     }
 
@@ -97,8 +95,8 @@ public void testGenSqoopCmd_NoPartition() throws IOException {
         String cmd = executable.getParam("cmd");
         Assert.assertTrue(cmd.contains("org.h2.Driver"));
         Assert.assertTrue(
-                cmd.contains("--boundary-query \"SELECT 
MIN(TEST_KYLIN_FACT.CAL_DT), MAX(TEST_KYLIN_FACT.CAL_DT)\n"
-                        + "FROM \\\"DEFAULT\\\".TEST_KYLIN_FACT AS 
TEST_KYLIN_FACT\""));
+                cmd.contains("--boundary-query \"SELECT 
MIN(\\\"TEST_KYLIN_FACT\\\".\\\"CAL_DT\\\"), 
MAX(\\\"TEST_KYLIN_FACT\\\".\\\"CAL_DT\\\")\n"
+                        + "FROM \\\"DEFAULT\\\".\\\"TEST_KYLIN_FACT\\\" AS 
\\\"TEST_KYLIN_FACT\\\"\""));
         source.close();
     }
 
@@ -123,8 +121,8 @@ public void testGenSqoopCmd_WithLookupShardBy() throws 
IOException {
         String cmd = executable.getParam("cmd");
         Assert.assertTrue(cmd.contains("org.h2.Driver"));
         Assert.assertTrue(cmd.contains(
-                "--boundary-query \"SELECT 
MIN(TEST_CATEGORY_GROUPINGS.META_CATEG_NAME), 
MAX(TEST_CATEGORY_GROUPINGS.META_CATEG_NAME)\n"
-                        + "FROM \\\"DEFAULT\\\".TEST_CATEGORY_GROUPINGS AS 
TEST_CATEGORY_GROUPINGS\""));
+                "--boundary-query \"SELECT 
MIN(\\\"TEST_CATEGORY_GROUPINGS\\\".\\\"META_CATEG_NAME\\\"), 
MAX(\\\"TEST_CATEGORY_GROUPINGS\\\".\\\"META_CATEG_NAME\\\")\n"
+                        + "FROM 
\\\"DEFAULT\\\".\\\"TEST_CATEGORY_GROUPINGS\\\" AS 
\\\"TEST_CATEGORY_GROUPINGS\\\"\""));
 
         source.close();
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to