pan3793 commented on code in PR #4368:
URL: https://github.com/apache/kyuubi/pull/4368#discussion_r1163547576


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala:
##########
@@ -62,47 +70,100 @@ object FlinkEngineUtils extends Logging {
   def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
     
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
 
-  def parseCliOptions(args: Array[String]): CliOptions = {
-    val (mode, modeArgs) =
-      if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
-      else (args(0), args.drop(1))
-    val options = parseEmbeddedModeClient(modeArgs)
-    if (mode == MODE_EMBEDDED) {
-      if (options.isPrintHelp) {
-        printHelpEmbeddedModeClient()
+  /**
+   * Copied and modified from 
[[org.apache.flink.table.client.cli.CliOptionsParser]]
+   * to avoid loading flink-python classes which we doesn't support yet.
+   */
+  private def discoverDependencies(
+      jars: java.util.List[URL],
+      libraries: java.util.List[URL]): java.util.List[URL] = {
+    val dependencies: java.util.List[URL] = new java.util.ArrayList[URL]
+    try { // find jar files
+      for (url <- jars) {
+        JarUtils.checkJarFile(url)
+        dependencies.add(url)
       }
-      options
+      // find jar files in library directories
+      for (libUrl <- libraries) {
+        val dir: File = new File(libUrl.toURI)
+        if (!dir.isDirectory) throw new SqlClientException("Directory 
expected: " + dir)
+        else if (!dir.canRead) throw new SqlClientException("Directory cannot 
be read: " + dir)
+        val files: Array[File] = dir.listFiles
+        if (files == null) throw new SqlClientException("Directory cannot be 
read: " + dir)
+        for (f <- files) { // only consider jars
+          if (f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar")) {
+            val url: URL = f.toURI.toURL
+            JarUtils.checkJarFile(url)
+            dependencies.add(url)
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        throw new SqlClientException("Could not load all required JAR files.", 
e)
+    }
+    dependencies
+  }
+
+  def getDefaultContext(
+      args: Array[String],
+      flinkConf: Configuration,
+      flinkConfDir: String): DefaultContext = {
+    val parser = new DefaultParser
+    val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+    val jars: java.util.List[URL] = Option(checkUrls(line, 
CliOptionsParser.OPTION_JAR))
+      .getOrElse(Collections.emptyList())
+    val libDirs: java.util.List[URL] = Option(checkUrls(line, 
CliOptionsParser.OPTION_LIBRARY))
+      .getOrElse(Collections.emptyList())
+    val dependencies: java.util.List[URL] = discoverDependencies(jars, libDirs)
+    if (FlinkEngineUtils.isFlinkVersionAtMost("1.16")) {

Review Comment:
   use exact match



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to