lidongsjtu closed pull request #141:  KYLIN-3345    KYLIN-3363     KYLIN-3380
URL: https://github.com/apache/kylin/pull/141
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3b2883c7f9..689d08fc79 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -852,6 +852,10 @@ public int getSqoopMapperNum() {
         return 
Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4"));
     }
 
+    public Map<String, String> getSqoopConfigOverride() {
+        return 
getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override.");
+    }
+    
     public String getJdbcSourceFieldDelimiter() {
         return getOptional("kylin.source.jdbc.field-delimiter", "|");
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 1823dfcc30..1c023aa8aa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -31,7 +31,6 @@
 public class HiveCmdBuilder {
     public static final Logger logger = 
LoggerFactory.getLogger(HiveCmdBuilder.class);
 
-    public static final String HIVE_CONF_FILENAME = "kylin_hive_conf";
     static final String CREATE_HQL_TMP_FILE_TEMPLATE = "cat >%s<<EOL\n%sEOL";
 
     public enum HiveClientMode {
@@ -44,7 +43,7 @@
 
     public HiveCmdBuilder() {
         kylinConfig = KylinConfig.getInstanceFromEnv();
-        hiveConfProps = HiveConfigurationUtil.loadHiveConfiguration();
+        hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration();
     }
 
     public String build() {
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
similarity index 70%
rename from 
core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
rename to 
core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
index 1c6f985a66..7e61d625c0 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import static org.apache.kylin.common.util.HiveCmdBuilder.HIVE_CONF_FILENAME;
-
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
@@ -38,11 +36,14 @@
  * @author ycq
  * @since 2018-03-05
  */
-public class HiveConfigurationUtil {
+public class SourceConfigurationUtil {
 
-    private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(HiveConfigurationUtil.class);
+    private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(SourceConfigurationUtil.class);
     private static final String HIVE_CONF_PREFIX = "hiveconf:";
 
+    public static final String HIVE_CONF_FILENAME = "kylin_hive_conf";
+    public static final String SQOOP_CONF_FILENAME = "kylin_sqoop_conf";
+
     public static Properties loadHiveJDBCProperties() {
         Map<String, String> hiveConfiguration = loadHiveConfiguration();
         Properties ret = new Properties();
@@ -53,27 +54,38 @@ public static Properties loadHiveJDBCProperties() {
     }
 
     public static Map<String, String> loadHiveConfiguration() {
-        Map<String, String> hiveConfProps = new HashMap<>();
-        File hiveConfFile;
-        String hiveConfFileName = (HIVE_CONF_FILENAME + ".xml");
+        return loadXmlConfiguration(HIVE_CONF_FILENAME, true);
+    }
+
+    public static Map<String, String> loadSqoopConfiguration() {
+        return loadXmlConfiguration(SQOOP_CONF_FILENAME, false);
+    }
+
+    private static Map<String, String> loadXmlConfiguration(String filename, 
boolean checkExist) {
+        Map<String, String> confProps = new HashMap<>();
+        File confFile;
+        String xmlFileName = filename + ".xml";
         String path = System.getProperty(KylinConfig.KYLIN_CONF);
 
         if (StringUtils.isNotEmpty(path)) {
-            hiveConfFile = new File(path, hiveConfFileName);
+            confFile = new File(path, xmlFileName);
         } else {
             path = KylinConfig.getKylinHome();
             if (StringUtils.isEmpty(path)) {
-                logger.error("KYLIN_HOME is not set, can not locate hive conf: 
{}.xml", HIVE_CONF_FILENAME);
-                return hiveConfProps;
+                logger.error("KYLIN_HOME is not set, can not locate conf: {}", 
xmlFileName);
+                return confProps;
             }
-            hiveConfFile = new File(path + File.separator + "conf", 
hiveConfFileName);
+            confFile = new File(path + File.separator + "conf", xmlFileName);
         }
 
-        if (!hiveConfFile.exists()) {
-            throw new RuntimeException("Failed to read " + HIVE_CONF_FILENAME 
+ ".xml");
+        if (!confFile.exists()) {
+            if (checkExist)
+                throw new RuntimeException("Failed to read " + xmlFileName);
+            else
+                return confProps;
         }
 
-        String fileUrl = 
OptionsHelper.convertToFileURL(hiveConfFile.getAbsolutePath());
+        String fileUrl = 
OptionsHelper.convertToFileURL(confFile.getAbsolutePath());
 
         try {
             File file = new File(fileUrl);
@@ -82,19 +94,19 @@ public static Properties loadHiveJDBCProperties() {
                 DocumentBuilder builder = factory.newDocumentBuilder();
                 Document doc = builder.parse(file);
                 NodeList nl = doc.getElementsByTagName("property");
-                hiveConfProps.clear();
+                confProps.clear();
                 for (int i = 0; i < nl.getLength(); i++) {
                     String key = 
doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
                     String value = 
doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
                     if (!key.equals("tmpjars")) {
-                        hiveConfProps.put(key, value);
+                        confProps.put(key, value);
                     }
                 }
             }
         } catch (Exception e) {
             throw new RuntimeException("Failed to parse hive conf file ", e);
         }
-        return hiveConfProps;
+        return confProps;
     }
 
 }
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/util/SourceConfigurationUtilTest.java
similarity index 73%
rename from 
core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
rename to 
core-common/src/test/java/org/apache/kylin/common/util/SourceConfigurationUtilTest.java
index d4019a9332..9e1a65ff78 100644
--- 
a/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/util/SourceConfigurationUtilTest.java
@@ -18,14 +18,17 @@
 
 package org.apache.kylin.common.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Map;
 import java.util.Properties;
 
 import org.junit.Before;
 import org.junit.Test;
 
-public class HiveConfigurationUtilTest {
+public class SourceConfigurationUtilTest {
     @Before
     public void setup() {
         System.setProperty("log4j.configuration", 
"file:../build/conf/kylin-tools-log4j.properties");
@@ -34,7 +37,14 @@ public void setup() {
 
     @Test
     public void testHiveConf() {
-        Properties properties = HiveConfigurationUtil.loadHiveJDBCProperties();
+        Properties properties = 
SourceConfigurationUtil.loadHiveJDBCProperties();
         
assertTrue(properties.containsKey("hiveconf:hive.auto.convert.join.noconditionaltask.size"));
     }
+
+    @Test
+    public void testSqoopConf() {
+        Map<String, String> configMap = 
SourceConfigurationUtil.loadSqoopConfiguration();
+        assertFalse(configMap.isEmpty());
+        assertEquals("1", configMap.get("dfs.replication"));
+    }
 }
diff --git a/examples/test_case_data/localmeta/kylin_sqoop_conf.xml 
b/examples/test_case_data/localmeta/kylin_sqoop_conf.xml
new file mode 100644
index 0000000000..de31211202
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin_sqoop_conf.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+        <description>Used in UT</description>
+    </property>
+</configuration>
diff --git a/pom.xml b/pom.xml
index a8e8312e1c..49f2da9c26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
-        <version>16</version>
+        <version>19</version>
         <relativePath />
         <!-- no parent resolution -->
     </parent>
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index 52b752c7f2..b1021dc885 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -30,7 +30,7 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DBUtils;
-import org.apache.kylin.common.util.HiveConfigurationUtil;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -60,7 +60,7 @@ public BeelineHiveClient(String beelineParams) {
                 password = stripQuotes(splits[i + 1]);
             }
         }
-        Properties jdbcProperties = 
HiveConfigurationUtil.loadHiveJDBCProperties();
+        Properties jdbcProperties = 
SourceConfigurationUtil.loadHiveJDBCProperties();
         jdbcProperties.put(HIVE_AUTH_PASSWD, password);
         jdbcProperties.put(HIVE_AUTH_USER, username);
         this.init(url, jdbcProperties);
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 457c832227..fcdb516404 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -30,6 +31,7 @@
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
 import org.apache.kylin.metadata.model.TableRef;
@@ -38,6 +40,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 public class JdbcHiveMRInput extends HiveMRInput {
 
     private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveMRInput.class);
@@ -53,7 +57,7 @@ public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flat
         public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             super(flatDesc);
         }
-        
+
         private KylinConfig getConfig() {
             return flatDesc.getDataModel().getConfig();
         }
@@ -140,20 +144,19 @@ private AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Strin
             KylinConfig config = getConfig();
             PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
             String partCol = null;
-            String partitionString = null;
 
             if (partitionDesc.isPartitioned()) {
                 partCol = 
partitionDesc.getPartitionDateColumn();//tablename.colname
-                partitionString = 
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                        flatDesc.getSegment(), flatDesc.getSegRange());
             }
 
             String splitTable;
+            String splitTableAlias;
             String splitColumn;
             String splitDatabase;
             TblColRef splitColRef = determineSplitColumn();
             splitTable = splitColRef.getTableRef().getTableName();
-            splitColumn = splitColRef.getName();
+            splitTableAlias = splitColRef.getTableAlias();
+            splitColumn = splitColRef.getExpressionInSourceDB();
             splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase();
 
             //using sqoop to extract data from jdbc source and dump them to 
hive
@@ -167,22 +170,29 @@ private AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Strin
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format("SELECT min(%s), max(%s) FROM 
%s.%s", splitColumn, splitColumn, splitDatabase,
-                    splitTable);
-            if (partitionString != null) {
-                bquery += " WHERE " + partitionString;
+            String bquery = String.format("SELECT min(%s), max(%s) FROM 
\"%s\".%s as %s", splitColumn, splitColumn,
+                    splitDatabase, splitTable, splitTableAlias);
+            if (partitionDesc.isPartitioned()) {
+                SegmentRange segRange = flatDesc.getSegRange();
+                if (segRange != null && !segRange.isInfinite()) {
+                    if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
+                            && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
+                                    
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                        bquery += " WHERE " + 
partitionDesc.getPartitionConditionBuilder()
+                                .buildDateRangeCondition(partitionDesc, 
flatDesc.getSegment(), segRange);
+                    }
+                }
             }
 
             //related to 
"kylin.engine.mr.config-override.mapreduce.job.queuename"
             String queueName = getSqoopJobQueueName(config);
-            String cmd = String.format(
-                    "%s/sqoop import 
-Dorg.apache.sqoop.splitter.allow_text_splitter=true "
-                            + "-Dmapreduce.job.queuename=%s "
-                            + "--connect \"%s\" --driver %s --username %s 
--password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s.%s 
--boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, queueName, connectionUrl, driverClass, 
jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
-                    splitTable, splitColumn, bquery, filedDelimiter, 
mapperNum);
+            String cmd = String.format("%s/sqoop import 
-Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+                    + generateSqoopConfigArgString()
+                    + "--connect \"%s\" --driver %s --username %s --password 
%s --query \"%s AND \\$CONDITIONS\" "
+                    + "--target-dir %s/%s --split-by %s.%s --boundary-query 
\"%s\" --null-string '' "
+                    + "--fields-terminated-by '%s' --num-mappers %d", 
sqoopHome, connectionUrl, driverClass, jdbcUser,
+                    jdbcPass, selectSql, jobWorkingDir, hiveTable, splitTable, 
splitColumn, bquery, filedDelimiter,
+                    mapperNum);
             logger.debug(String.format("sqoop cmd:%s", cmd));
             CmdStep step = new CmdStep();
             step.setCmd(cmd);
@@ -194,5 +204,19 @@ private AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Strin
         protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
             // skip
         }
+
+        protected String generateSqoopConfigArgString() {
+            KylinConfig kylinConfig = getConfig();
+            Map<String, String> config = Maps.newHashMap();
+            config.put("mapreduce.job.queuename", 
getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
+            config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
+            config.putAll(kylinConfig.getSqoopConfigOverride());
+
+            StringBuilder args = new StringBuilder();
+            for (Map.Entry<String, String> entry : config.entrySet()) {
+                args.append(" -D" + entry.getKey() + "=" + entry.getValue() + 
" ");
+            }
+            return args.toString();
+        }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to