pan3793 commented on code in PR #4368:
URL: https://github.com/apache/kyuubi/pull/4368#discussion_r1167741903


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala:
##########
@@ -62,47 +70,105 @@ object FlinkEngineUtils extends Logging {
   def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
     
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
 
-  def parseCliOptions(args: Array[String]): CliOptions = {
-    val (mode, modeArgs) =
-      if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
-      else (args(0), args.drop(1))
-    val options = parseEmbeddedModeClient(modeArgs)
-    if (mode == MODE_EMBEDDED) {
-      if (options.isPrintHelp) {
-        printHelpEmbeddedModeClient()
+  /**
+   * Copied and modified from 
[[org.apache.flink.table.client.cli.CliOptionsParser]]
+   * to avoid loading flink-python classes which we doesn't support yet.
+   */
+  private def discoverDependencies(
+      jars: JList[URL],
+      libraries: JList[URL]): JList[URL] = {
+    val dependencies: JList[URL] = new JArrayList[URL]
+    try { // find jar files
+      for (url <- jars) {
+        JarUtils.checkJarFile(url)
+        dependencies.add(url)
       }
-      options
+      // find jar files in library directories
+      libraries.foreach { libUrl =>
+        val dir: File = new File(libUrl.toURI)
+        if (!dir.isDirectory) throw new SqlClientException(s"Directory 
expected: $dir")
+        if (!dir.canRead) throw new SqlClientException(s"Directory cannot be 
read: $dir")
+        val files: Array[File] = dir.listFiles
+        if (files == null) throw new SqlClientException(s"Directory cannot be 
read: $dir")
+        for (f <- files.filter(f => f.isFile && 
f.getAbsolutePath.toLowerCase.endsWith(".jar"))) {
+          val url: URL = f.toURI.toURL
+          JarUtils.checkJarFile(url)
+          dependencies.add(url)
+        }
+      }
+    } catch {
+      case e: Exception =>
+        throw new SqlClientException("Could not load all required JAR files.", 
e)
+    }
+    dependencies
+  }
+
+  def getDefaultContext(
+      args: Array[String],
+      flinkConf: Configuration,
+      flinkConfDir: String): DefaultContext = {
+    val parser = new DefaultParser
+    val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+    val jars: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_JAR))
+      .getOrElse(JCollections.emptyList())
+    val libDirs: JList[URL] = Option(checkUrls(line, 
CliOptionsParser.OPTION_LIBRARY))
+      .getOrElse(JCollections.emptyList())
+    val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      val commandLines: JList[CustomCommandLine] =
+        Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
+      DynConstructors.builder()
+        .impl(
+          classOf[DefaultContext],
+          classOf[Configuration],
+          classOf[JList[CustomCommandLine]])
+        .build()
+        .newInstance(flinkConf, commandLines)
+        .asInstanceOf[DefaultContext]
+    } else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
+      DynMethods.builder("load")
+        .impl(
+          classOf[DefaultContext],
+          classOf[Configuration],
+          classOf[JList[URL]],
+          classOf[Boolean],
+          classOf[Boolean])
+        .buildStatic()
+        .invoke[DefaultContext](
+          flinkConf,
+          dependencies,
+          new JBoolean(true),
+          new JBoolean(false))
     } else {
-      throw new SqlClientException("Other mode is not supported yet.")
+      throw new KyuubiException(
+        s"Flink version ${EnvironmentInformation.getVersion} are not supported 
currently.")
     }
   }
 
-  def getSessionContext(localExecutor: LocalExecutor, sessionId: String): 
SessionContext = {
-    val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext", 
classOf[String])
-    method.setAccessible(true)
-    method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
+  def getSessionContext(session: Session): SessionContext = {
+    DynFields.builder()
+      .hiddenImpl(classOf[Session], "sessionContext")
+      .build()
+      .get(session)
+      .asInstanceOf[SessionContext]
   }
 
-  def parseEmbeddedModeClient(args: Array[String]): CliOptions =
+  def getResultJobId(resultFetch: ResultFetcher): JobID = {

Review Comment:
   Generally, we should use `Option` to represent nullable value as much as 
possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to