pan3793 commented on code in PR #4368:
URL: https://github.com/apache/kyuubi/pull/4368#discussion_r1167741903
##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala:
##########
@@ -62,47 +70,105 @@ object FlinkEngineUtils extends Logging {
def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
- def parseCliOptions(args: Array[String]): CliOptions = {
- val (mode, modeArgs) =
- if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
- else (args(0), args.drop(1))
- val options = parseEmbeddedModeClient(modeArgs)
- if (mode == MODE_EMBEDDED) {
- if (options.isPrintHelp) {
- printHelpEmbeddedModeClient()
+ /**
+ * Copied and modified from
[[org.apache.flink.table.client.cli.CliOptionsParser]]
+ * to avoid loading flink-python classes which we doesn't support yet.
+ */
+ private def discoverDependencies(
+ jars: JList[URL],
+ libraries: JList[URL]): JList[URL] = {
+ val dependencies: JList[URL] = new JArrayList[URL]
+ try { // find jar files
+ for (url <- jars) {
+ JarUtils.checkJarFile(url)
+ dependencies.add(url)
}
- options
+ // find jar files in library directories
+ libraries.foreach { libUrl =>
+ val dir: File = new File(libUrl.toURI)
+ if (!dir.isDirectory) throw new SqlClientException(s"Directory
expected: $dir")
+ if (!dir.canRead) throw new SqlClientException(s"Directory cannot be
read: $dir")
+ val files: Array[File] = dir.listFiles
+ if (files == null) throw new SqlClientException(s"Directory cannot be
read: $dir")
+ for (f <- files.filter(f => f.isFile &&
f.getAbsolutePath.toLowerCase.endsWith(".jar"))) {
+ val url: URL = f.toURI.toURL
+ JarUtils.checkJarFile(url)
+ dependencies.add(url)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ throw new SqlClientException("Could not load all required JAR files.",
e)
+ }
+ dependencies
+ }
+
+ def getDefaultContext(
+ args: Array[String],
+ flinkConf: Configuration,
+ flinkConfDir: String): DefaultContext = {
+ val parser = new DefaultParser
+ val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+ val jars: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_JAR))
+ .getOrElse(JCollections.emptyList())
+ val libDirs: JList[URL] = Option(checkUrls(line,
CliOptionsParser.OPTION_LIBRARY))
+ .getOrElse(JCollections.emptyList())
+ val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ val commandLines: JList[CustomCommandLine] =
+ Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
+ DynConstructors.builder()
+ .impl(
+ classOf[DefaultContext],
+ classOf[Configuration],
+ classOf[JList[CustomCommandLine]])
+ .build()
+ .newInstance(flinkConf, commandLines)
+ .asInstanceOf[DefaultContext]
+ } else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
+ DynMethods.builder("load")
+ .impl(
+ classOf[DefaultContext],
+ classOf[Configuration],
+ classOf[JList[URL]],
+ classOf[Boolean],
+ classOf[Boolean])
+ .buildStatic()
+ .invoke[DefaultContext](
+ flinkConf,
+ dependencies,
+ new JBoolean(true),
+ new JBoolean(false))
} else {
- throw new SqlClientException("Other mode is not supported yet.")
+ throw new KyuubiException(
+ s"Flink version ${EnvironmentInformation.getVersion} are not supported
currently.")
}
}
- def getSessionContext(localExecutor: LocalExecutor, sessionId: String):
SessionContext = {
- val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext",
classOf[String])
- method.setAccessible(true)
- method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
+ def getSessionContext(session: Session): SessionContext = {
+ DynFields.builder()
+ .hiddenImpl(classOf[Session], "sessionContext")
+ .build()
+ .get(session)
+ .asInstanceOf[SessionContext]
}
- def parseEmbeddedModeClient(args: Array[String]): CliOptions =
+ def getResultJobId(resultFetch: ResultFetcher): JobID = {
Review Comment:
Generally, we should use `Option` to represent nullable value as much as
possible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]