jiaoqingbo commented on code in PR #2418:
URL: https://github.com/apache/incubator-kyuubi/pull/2418#discussion_r857052247
##########
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala:
##########
@@ -40,44 +42,71 @@ class FlinkProcessBuilder(
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
- override protected def executable: String = {
- val flinkEngineHomeOpt = env.get("FLINK_ENGINE_HOME").orElse {
- val cwd = Utils.getCodeSourceLocation(getClass)
- .split("kyuubi-server")
- assert(cwd.length > 1)
- Option(
- Paths.get(cwd.head)
- .resolve("externals")
- .resolve("kyuubi-flink-sql-engine")
- .toFile)
- .map(_.getAbsolutePath)
- }
-
- flinkEngineHomeOpt.map { dir =>
- Paths.get(dir, "bin",
FLINK_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
- } getOrElse {
- throw KyuubiSQLException("FLINK_ENGINE_HOME is not set! " +
- "For more detail information on installing and configuring Flink,
please visit " +
-
"https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
- }
- }
-
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String =
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
override protected def childProcEnv: Map[String, String] = conf.getEnvs +
("FLINK_HOME" -> FLINK_HOME) +
- ("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf") +
- ("FLINK_SQL_ENGINE_JAR" -> mainResource.get) +
- ("FLINK_SQL_ENGINE_DYNAMIC_ARGS" ->
- conf.getAll.filter { case (k, _) =>
- k.startsWith("kyuubi.") || k.startsWith("flink.") ||
- k.startsWith("hadoop.") || k.startsWith("yarn.")
- }.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
-
- override protected def commands: Array[String] = Array(executable)
+ ("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf")
+
+ override protected def commands: Array[String] = {
+ val buffer = new ArrayBuffer[String]()
+ buffer += executable
+
+ // TODO: How shall we deal with proxyUser,
+ // user.name
+ // kyuubi.session.user
+ // or just leave it, because we can handle it at operation layer
+ buffer += s"-D$KYUUBI_SESSION_USER_KEY=$proxyUser"
+
+ // TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or
kyuubi.engine.flink.memory to configure
+ // -Xmx5g
+ // java options
+ for ((k, v) <- conf.getAll) {
+ buffer += s"-D$k=$v"
+ }
+ buffer += "-cp"
+ val classpathEntries = new LinkedHashSet[String]
+ // flink engine runtime jar
+ mainResource.foreach(classpathEntries.add)
+ // flink sql client jar
+ val flinkSqlClientPath = Paths.get(FLINK_HOME)
+ .resolve("opt")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.startsWith("flink-sql-client")
+ }
+ }).head.getAbsolutePath
+ classpathEntries.add(flinkSqlClientPath)
+
+ // jars from flink lib
+ classpathEntries.add(s"$FLINK_HOME${File.separator}lib${File.separator}*")
+
+ // classpath contains flink configurations, default to flink.home/conf
+ classpathEntries.add(env.getOrElse("FLINK_CONF_DIR",
s"$FLINK_HOME${File.separator}conf"))
+ // classpath contains hadoop configurations
+ var hadoopConfDir = env.get("HADOOP_CONF_DIR")
+ val hadoopClassPath = env.get("HADOOP_CLASSPATH")
+ if (hadoopConfDir.isEmpty && hadoopClassPath.isEmpty) {
+ val defaultHadoopConfDir =
s"${File.separator}etc${File.separator}hadoop${File.separator}conf"
Review Comment:
> IMO, when the `hadoopConfDir` and `hadoopClassPath` is empty, the
operation that the hadoop conf is setting to `/etc/hadoop/confg` at default is
confusing for users, because the hadoop conf directory is logged in the ouput
of the Flink engine bootstrap procedure.
ok I will delete this
##########
kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala:
##########
@@ -17,16 +17,105 @@
package org.apache.kyuubi.engine.flink
-import org.apache.kyuubi.KyuubiFunSuite
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.LinkedHashSet
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite,
SCALA_COMPILE_VERSION}
import org.apache.kyuubi.config.KyuubiConf
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def conf = KyuubiConf().set("kyuubi.on", "off")
+ private def envDefault: Map[String, String] = Map(
+ "JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
+ private def confStr: String = {
+ val configBuffer = new ArrayBuffer[String]()
+ for ((k, v) <- conf.getAll) {
+ configBuffer += s"-D$k=$v"
+ }
+ configBuffer.toArray.mkString(" ")
+ }
+ private val javaPath =
s"${envDefault("JAVA_HOME")}${File.separator}bin${File.separator}java"
+ private val flinkSqlClientJarPathSuffix =
s"${File.separator}opt${File.separator}" +
+ s"flink-sql-client_$SCALA_COMPILE_VERSION-$FLINK_COMPILE_VERSION.jar"
+ private val flinkLibPathSuffix = s"${File.separator}lib${File.separator}*"
+ private val flinkConfPathSuffix = s"${File.separator}conf"
+ private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
test("flink engine process builder") {
- val builder = new FlinkProcessBuilder("vinoyang", conf)
- val commands = builder.toString.split(' ')
- assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
+ val builder = new FlinkProcessBuilder("vinoyang", conf) {
+ override protected def env: Map[String, String] = envDefault +
+ ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf")
+
+ ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf") +
+ ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
+ ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+ }
+ val commands = builder.toString
+
+ val classpathEntries = new LinkedHashSet[String]
+ builder.mainResource.foreach(classpathEntries.add)
+ val flinkSqlClientJarPath =
s"${builder.FLINK_HOME}$flinkSqlClientJarPathSuffix"
+ val flinkLibPath = s"${builder.FLINK_HOME}$flinkLibPathSuffix"
+ val flinkConfPath = s"${builder.FLINK_HOME}$flinkConfPathSuffix"
+ val hadoopConfPath = s"${File.separator}hadoop${File.separator}conf"
Review Comment:
> Could this be optimized to iterate the above map for env?
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]