[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4636 ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r141084730 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala --- @@ -320,14 +315,6 @@ private[flink] trait TypeInformationGen[C <: Context] { } } - def mkWritableTypeInfo[T <: Writable : c.WeakTypeTag]( --- End diff -- Scratch that, this actually still works and I added a test for that in the Hadoop compat package. ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r139651625 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration result = new Configuration(); + boolean foundHadoopConfiguration = false; + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = + flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); + + if (hdfsDefaultPath != null) { + result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath); + foundHadoopConfiguration = true; + } else { + LOG.debug("Cannot find hdfs-default configuration-file path in Flink config."); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath); + foundHadoopConfiguration = true; + } else { + LOG.debug("Cannot find hdfs-site configuration-file path in Flink config."); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r139481104 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration result = new Configuration(); + boolean foundHadoopConfiguration = false; + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = + flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); + + if (hdfsDefaultPath != null) { + result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath); + foundHadoopConfiguration = true; + } else { + LOG.debug("Cannot find hdfs-default configuration-file path in Flink config."); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath); + foundHadoopConfiguration = true; + } else { + LOG.debug("Cannot find hdfs-site configuration-file path in Flink config."); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137215719 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); + } + } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137215695 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { --- End diff -- will do ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137215680 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); --- End diff -- will do ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137213490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); --- End diff -- You're right, I just copied these from another `HadoopUtils`. I'm fixing. ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137213174 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java --- @@ -112,29 +114,29 @@ public static void runYarnTaskManager(String[] args, final Class new HadoopModule(securityConfig, hadoopConfiguration))); - SecurityUtils.SecurityConfiguration sc; - if (hadoopConfiguration != null) { - sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration); } else { sc = new SecurityUtils.SecurityConfiguration(configuration); + --- End diff -- fixing ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137213196 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); --- End diff -- indeed, I'm fixing ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137212834 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java --- @@ -52,6 +51,13 @@ private static final AppConfigurationEntry userKerberosAce; + /* Return the Kerberos login module name */ + public static String getKrb5LoginModuleName() { --- End diff -- Yes, this had a dependency on `KerberosUtil` form Hadoop just for this method. Now we can have Kerberos independent of Hadoop. ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137212492 --- Diff: flink-tests/pom.xml --- @@ -54,6 +54,12 @@ under the License. org.apache.flink + flink-shaded-hadoop2 + ${project.version} + --- End diff -- I'll add test scope and see if everything still runs. ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137212206 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala --- @@ -320,14 +315,6 @@ private[flink] trait TypeInformationGen[C <: Context] { } } - def mkWritableTypeInfo[T <: Writable : c.WeakTypeTag]( --- End diff -- Unfortunately, this means that users now have to manually specify a `TypeInformation` that they can get from `TypeExtractor.createHadoopWritableTypeInfo(MyWritable.class)`. I'm not sure how often people are using Hadoop Writables in their Scala code but this is definitely something that will break. ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137211750 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java --- @@ -284,7 +284,16 @@ public static void logEnvironmentInfo(Logger log, String componentName, String[] log.info(" JVM: " + jvmVersion); log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes"); log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome)); - log.info(" Hadoop version: " + VersionInfo.getVersion()); + + try { + Class.forName( + "org.apache.hadoop.util.VersionInfo", + false, + EnvironmentInformation.class.getClassLoader()); + log.info(" Hadoop version: " + VersionInfo.getVersion()); --- End diff -- Yes, that is intended because I didn't want to fiddle with the reflection API. Ideally, I would like to do this: ``` try { log.info(" Hadoop version: " + VersionInfo.getVersion()); } catch (ClassNotFoundException e) { // ignore } ``` but java won't let you do this. With the explicit `Class.forName()` it will let me put the catch block. ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136822032 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); --- End diff -- this should say that they could not be loaded from the flink configuration ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136822393 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { --- End diff -- Let's track whether any of these succeeded and log something otherwise (mirroring the flink configuration approach). ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136821811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); --- End diff -- this is a rather odd line-break ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136822467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); --- End diff -- add debug statement to mirror environment variables approach ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136818368 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala --- @@ -320,14 +315,6 @@ private[flink] trait TypeInformationGen[C <: Context] { } } - def mkWritableTypeInfo[T <: Writable : c.WeakTypeTag]( --- End diff -- What exactly does this removal mean for supporting `Writable`? Does hadoop-compat take care of that? ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136822623 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); + } + } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136819789 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java --- @@ -52,6 +51,13 @@ private static final AppConfigurationEntry userKerberosAce; + /* Return the Kerberos login module name */ + public static String getKrb5LoginModuleName() { --- End diff -- I assume this was copied from hadoop? ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136821489 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java --- @@ -112,29 +114,29 @@ public static void runYarnTaskManager(String[] args, final Class new HadoopModule(securityConfig, hadoopConfiguration))); - SecurityUtils.SecurityConfiguration sc; - if (hadoopConfiguration != null) { - sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration); } else { sc = new SecurityUtils.SecurityConfiguration(configuration); + --- End diff -- revert ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136817884 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java --- @@ -284,7 +284,16 @@ public static void logEnvironmentInfo(Logger log, String componentName, String[] log.info(" JVM: " + jvmVersion); log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes"); log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome)); - log.info(" Hadoop version: " + VersionInfo.getVersion()); + + try { + Class.forName( + "org.apache.hadoop.util.VersionInfo", + false, + EnvironmentInformation.class.getClassLoader()); + log.info(" Hadoop version: " + VersionInfo.getVersion()); --- End diff -- Did you intend to directly call `VersionInfo`, or should we maybe do this with reflection instead? ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136818648 --- Diff: flink-tests/pom.xml --- @@ -54,6 +54,12 @@ under the License. org.apache.flink + flink-shaded-hadoop2 + ${project.version} + --- End diff -- add test scope? ---
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r136821705 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.ConfigConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; + +/** + * Utility class for working with Hadoop-related classes. This should only be used if Hadoop + * is on the classpath. + */ +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { + + Configuration retConf = new Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); + } + } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
[GitHub] flink pull request #4636: [FLINK-2268] Allow Flink binary release without Ha...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4636 [FLINK-2268] Allow Flink binary release without Hadoop This is a series of PRs that allows running a Flink without any Hadoop dependencies in the lib folder. Each PR stands on its own but all of them are necessary for the last commit to work. The commit's themselves clearly document what is changed. R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink hadoop-free-flink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4636.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4636 commit 178c227f9a78b6fa8eca89d93b79f47d6d2cfda5 Author: Aljoscha KrettekDate: 2017-08-21T17:55:57Z [FLINK-4048] Remove Hadoop from DataSet API This removes all Hadoop-related methods from ExecutionEnvironment (there are already equivalent methods in flink-hadoop-compatibility (see HadoopUtils and HadoopInputs, etc.). This also removes Hadoop-specific tests from flink-tests because these are duplicated by tests in flink-hadoop-compatibility. This also removes Hadoop-specic example code from flink-examples: the DistCp example and related code. commit b0f76980ccab2cd4b742e78453b788aea93c9680 Author: Aljoscha Krettek Date: 2017-08-22T14:40:28Z [FLINK-2268] Remove Hadoop-related Akka Serializers from runtime commit 4af83b119473f0245ea42be34dcb44099fd7af19 Author: Aljoscha Krettek Date: 2017-08-22T14:42:22Z [FLINK-2268] Remove unused HDFS copy-utils from flink-streaming-java commit 524f30bc8ffe329b7b0559ad8499148bf9707f3b Author: Aljoscha Krettek Date: 2017-08-22T14:44:22Z [FLINK-2268] Don't use Hadoop Writable in JoinOperatorTest commit da76ff8b5ec34b5e2d0d689892950262538bc384 Author: Aljoscha Krettek Date: 2017-08-22T14:46:20Z [FLINK-2268] Don't use commons-io ByteArrayOutputStream in NFATest commons-io is only usable as a transitive dependency of the Hadoop dependencies. We can just use the Java ByteArrayOutputStream and get rid of that dependency. commit 5c9ee77075d43f6e7fe8b2ccad53f2c37ed896d6 Author: Aljoscha Krettek Date: 2017-08-22T14:47:36Z [FLINK-4048] Remove Hadoop GenericOptionsParser from ParameterTool There are methods for this in flink-hadoop-compatibility. commit 04ebf521d616e9370430a72d7054c6862bd96c3e Author: Aljoscha Krettek Date: 2017-08-22T14:50:18Z [FLINK-2268] Don't use Hadoop FileSystem in RocksDB tests This was in there because of legacy reasons but is not required by the test. commit 1c417fdcf8e7068a5eb6f8429e63067475bc6fc0 Author: Aljoscha Krettek Date: 2017-08-22T14:51:53Z [FLINK-2268] Don't use jets3t in MesosArtifactServer This was only used for the Enum for a specific http response type. The jets3t dependency is only available as a transitive dependency of the Hadoop dependencies, that's why we remove it. commit 92d28b7182d428cfd5bd78cfa1f9cbbd521f95f0 Author: Aljoscha Krettek Date: 2017-08-22T14:52:28Z [FLINK-2268] Only print Hadoop env info if Hadoop is in the classpath commit 80fe2708c773666c3a25add470b34b528ea6c75d Author: Aljoscha Krettek Date: 2017-08-22T14:54:33Z [FLINK-2268] Close Hadoop FS reflectively in TestBaseUtils This removes the dependency on Hadoop and ensures that we only close if Hadoop is available. commit c876012e81fbc6224c1ff036707a4dcc684266e9 Author: Aljoscha Krettek Date: 2017-08-24T12:38:02Z [FLINK-2268] Remove Writable support from Scala TypeInformation Macro commit 810fb147618678b70826129d1ba24d6b140d60dd Author: Aljoscha Krettek Date: 2017-08-24T12:22:26Z [FLINK-2268] Dynamically load Hadoop security module when available commit 384620a07b8483f811e802a72b763a9ca2f20c0a Author: Aljoscha Krettek Date: 2017-08-24T12:32:14Z [FLINK-2268] Don't include Hadoop deps in flink-core/flink-java This also makes them optional in flink-runtime, which is enabled by the previous changes to only use Hadoop dependencies if they are available. This also requires adding a few explicit dependencies in other modules because they were using transitive dependencies of the Hadoop deps. The most common dependency there is, ha!, commons-io. commit 05497812bc5b8a7a8717ba8a7d052502b7d6386a Author: Aljoscha Krettek Date: