pan3793 commented on code in PR #4368:
URL: https://github.com/apache/kyuubi/pull/4368#discussion_r1166769103
##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala:
##########
@@ -62,47 +70,107 @@ object FlinkEngineUtils extends Logging {
def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
- def parseCliOptions(args: Array[String]): CliOptions = {
- val (mode, modeArgs) =
- if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
- else (args(0), args.drop(1))
- val options = parseEmbeddedModeClient(modeArgs)
- if (mode == MODE_EMBEDDED) {
- if (options.isPrintHelp) {
- printHelpEmbeddedModeClient()
+ /**
+ * Copied and modified from
[[org.apache.flink.table.client.cli.CliOptionsParser]]
+ * to avoid loading flink-python classes which we doesn't support yet.
+ */
+ private def discoverDependencies(
+ jars: java.util.List[URL],
+ libraries: java.util.List[URL]): java.util.List[URL] = {
+ val dependencies: java.util.List[URL] = new java.util.ArrayList[URL]
+ try { // find jar files
+ for (url <- jars) {
+ JarUtils.checkJarFile(url)
+ dependencies.add(url)
}
- options
+ // find jar files in library directories
+ for (libUrl <- libraries) {
+ val dir: File = new File(libUrl.toURI)
+ if (!dir.isDirectory) throw new SqlClientException("Directory
expected: " + dir)
+ else if (!dir.canRead) throw new SqlClientException("Directory cannot
be read: " + dir)
+ val files: Array[File] = dir.listFiles
+ if (files == null) throw new SqlClientException("Directory cannot be
read: " + dir)
+ for (f <- files) { // only consider jars
+ if (f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar")) {
+ val url: URL = f.toURI.toURL
+ JarUtils.checkJarFile(url)
+ dependencies.add(url)
+ }
+ }
+ }
+ } catch {
+ case e: Exception =>
+ throw new SqlClientException("Could not load all required JAR files.",
e)
+ }
+ dependencies
+ }
+
+ def getDefaultContext(
+ args: Array[String],
+ flinkConf: Configuration,
+ flinkConfDir: String): DefaultContext = {
+ val parser = new DefaultParser
+ val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+ val jars: java.util.List[URL] = Option(checkUrls(line,
CliOptionsParser.OPTION_JAR))
+ .getOrElse(Collections.emptyList())
+ val libDirs: java.util.List[URL] = Option(checkUrls(line,
CliOptionsParser.OPTION_LIBRARY))
+ .getOrElse(Collections.emptyList())
+ val dependencies: java.util.List[URL] = discoverDependencies(jars, libDirs)
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ val commandLines: java.util.List[CustomCommandLine] =
+ Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
+ DynConstructors.builder()
+ .impl(
+ classOf[DefaultContext],
+ classOf[Configuration],
+ classOf[java.util.List[CustomCommandLine]])
+ .build()
+ .newInstance(flinkConf, commandLines)
+ .asInstanceOf[DefaultContext]
+ } else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
+ DynMethods.builder("load")
+ .impl(
+ classOf[DefaultContext],
+ classOf[Configuration],
+ classOf[java.util.List[URL]],
+ JavaPrimitiveClasses.CLASS_PRIMITIVE_BOOLEAN,
+ JavaPrimitiveClasses.CLASS_PRIMITIVE_BOOLEAN)
+ .buildStatic()
+ .invoke[DefaultContext](
+ flinkConf,
+ dependencies,
+ new java.lang.Boolean(true),
Review Comment:
let's use `JBoolean` or `JBool` instead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]