juiceyang opened a new issue, #7018:
URL: https://github.com/apache/kyuubi/issues/7018

   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [x] I have searched in the 
[issues](https://github.com/apache/kyuubi/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Describe the bug
   
   According to Kyuubi documentation, configuration 
["kyuubi.session.engine.flink.fetch.timeout"](https://kyuubi.readthedocs.io/en/v1.10.0/configuration/settings.html#:~:text=kyuubi.session.engine.flink.fetch.timeout)
 is described as:
   
   > Result fetch timeout for Flink engine. If the timeout is reached, the 
result fetch would be stopped and the current fetched would be returned. If no 
data are fetched, a TimeoutException would be thrown.
   
   But when querying low traffic Flink result with kyuubi, even if the 
configured fetch timeout is reached, the query will not respond with results 
already fetched, which contradicts with the description in Kyuubi documentation.
   
   ### How to reproduce
   
   #### Environment:
   - Kyuubi 1.10.0 
   - Flink 1.20.1 
   - Hadoop 3.1.3
   
   #### Steps
   1. Start Kyuubi server at Kyuubi root: `bin/kyuubi start`
   2. Start Flink session cluster at Flink root: `bin/start-cluster.sh`
   3. Establish JDBC connection to Kyuubi server with Kyuubi JDBC client with 
fetch timeout set to 30 seconds. 
   Create a Flink DataGen source table with low traffic (1 row per second). 
   Submit a simple select statement from that DataGen table and incrementally 
print query result. Sample code is shown as below.
   ```java
   package com.example;
   
   import java.sql.Connection;
   import java.sql.DriverManager;
   import java.sql.ResultSet;
   import java.sql.ResultSetMetaData;
   import java.sql.SQLException;
   import java.sql.Statement;
   import java.time.Instant;
   
   public class LocalConnectionTest {
     private static final String CONNECTION_URL =
         
"jdbc:kyuubi://localhost:10009/default?kyuubi.engine.type=FLINK_SQL;kyuubi.engine.share.level=CONNECTION;kyuubi.session.engine.flink.fetch.timeout=PT30S";
   
     private static Connection newKyuubiConnection() throws Exception {
       return DriverManager.getConnection(CONNECTION_URL);
     }
   
     private static void printResultSet(ResultSet rs) throws SQLException {
       ResultSetMetaData rsmd = rs.getMetaData();
       int columnCount = rsmd.getColumnCount();
   
       System.out.print("[" + Instant.now().toString() + "]\t");
       for (int i = 1; i <= columnCount; i++) {
         System.out.print(rsmd.getColumnName(i) + "\t");
       }
       System.out.println();
       
       // rs.next() will not respond even if fetch timeout happends
       while (rs.next()) {
         System.out.print("[" + Instant.now().toString() + "]\t");
         for (int i = 1; i <= columnCount; i++) {
           System.out.print(rs.getString(i) + "\t");
         }
         System.out.println();
       }
     }
   
     public static void main(String[] args) throws Exception {
       String createDataGen =
           "CREATE TABLE orders (\n"
               + "    order_number BIGINT\n"
               + ") WITH (\n"
               + "  'connector' = 'datagen',\n"
               + "  'rows-per-second' = '1'\n"
               + ");";
   
       String selectFromDataGen = "select * from orders";
   
       try (Connection connection = newKyuubiConnection()) {
         // first, create a low traffic DataGen source table
         connection.createStatement().executeQuery(createDataGen);
         Statement statement = connection.createStatement();
         // then, query the low traffic DataGen table
         ResultSet rs = statement.executeQuery(selectFromDataGen);
         printResultSet(rs);
       }
     }
   }
   ```
   
   #### Expected result:
   
   Since default fetch size is 1000, it will cost 1000 seconds to fill the 
fetch buffer in [this while 
loop](https://github.com/apache/kyuubi/blob/9047151d8dd24cb5834770c9b216438838b7f9ed/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala#L117),
 which is way longer than the 30 seconds fetch timeout.
   
   So after 30 seconds of every ResultSet::next method call, the 
ResultSet::next is expected to return with approximate 30 rows of result.
   
   #### Actual result:
   
   The main thread is blocked at ResultSet::next method call and will not 
return even if 30 seconds of fetch timeout is reached.
   
   
   ### Affects Version(s)
   
   1.10.1
   
   ### Kyuubi Server Log Output
   
   ```logtalk
   2025-04-11 15:22:41.076 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.operation.LaunchEngine: Processing anonymous's 
query[f29903b6-d303-4011-8026-d666de85e373]: PENDING_STATE -> RUNNING_STATE, 
statement:
   LaunchEngine
   2025-04-11 15:22:41.076 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.session.KyuubiSessionManager: anonymous's KyuubiSessionImpl 
with SessionHandle [44b7cd3b-e2cc-4a4b-8427-6d0d27d30617] is opened, current 
opening sessions 1
   2025-04-11 15:22:41.076 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=f29903b6-d303-4011-8026-d666de85e373      opType=LaunchEngine     
state=RUNNING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:41.077 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.shaded.curator.framework.imps.CuratorFrameworkImpl: Starting
   2025-04-11 15:22:41.077 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.shaded.zookeeper.ZooKeeper: Initiating client connection, 
connectString=10.3.112.185:2181 sessionTimeout=60000 
watcher=org.apache.kyuubi.shaded.curator.ConnectionState@615cdfa2
   2025-04-11 15:22:41.079 INFO KyuubiSessionManager-exec-pool: 
Thread-62-SendThread(10.3.112.185:2181) 
org.apache.kyuubi.shaded.zookeeper.ClientCnxn: Opening socket connection to 
server 10.3.112.185/10.3.112.185:2181. Will not attempt to authenticate using 
SASL (unknown error)
   2025-04-11 15:22:41.079 INFO KyuubiSessionManager-exec-pool: 
Thread-62-SendThread(10.3.112.185:2181) 
org.apache.kyuubi.shaded.zookeeper.ClientCnxn: Socket connection established to 
10.3.112.185/10.3.112.185:2181, initiating session
   2025-04-11 15:22:41.079 INFO NIOServerCxn.Factory:/10.3.112.185:2181 
org.apache.kyuubi.shaded.zookeeper.server.NIOServerCnxnFactory: Accepted socket 
connection from /10.3.112.185:52033
   2025-04-11 15:22:41.079 INFO NIOServerCxn.Factory:/10.3.112.185:2181 
org.apache.kyuubi.shaded.zookeeper.server.ZooKeeperServer: Client attempting to 
establish new session at /10.3.112.185:52033
   2025-04-11 15:22:41.080 INFO SyncThread:0 
org.apache.kyuubi.shaded.zookeeper.server.ZooKeeperServer: Established session 
0x1003886d20d0001 with negotiated timeout 60000 for client /10.3.112.185:52033
   2025-04-11 15:22:41.080 INFO KyuubiSessionManager-exec-pool: 
Thread-62-SendThread(10.3.112.185:2181) 
org.apache.kyuubi.shaded.zookeeper.ClientCnxn: Session establishment complete 
on server 10.3.112.185/10.3.112.185:2181, sessionid = 0x1003886d20d0001, 
negotiated timeout = 60000
   2025-04-11 15:22:41.080 INFO KyuubiSessionManager-exec-pool: 
Thread-62-EventThread 
org.apache.kyuubi.shaded.curator.framework.state.ConnectionStateManager: State 
change: CONNECTED
   2025-04-11 15:22:41.143 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.engine.EngineRef: Launching engine:
   /Users/juiceyang/Tools/amazon-corretto-1.8.jdk/Contents/Home/bin/java \
        -Xmx1g \
        -cp 
/Users/juiceyang/Tools/apache-kyuubi-1.10.1-bin/externals/engines/flink/kyuubi-flink-sql-engine_2.12-1.10.1.jar:/Users/juiceyang/Tools/flink-1.20.1/opt/flink-sql-client-1.20.1.jar:/Users/juiceyang/Tools/flink-1.20.1/opt/flink-sql-gateway-1.20.1.jar:/Users/juiceyang/Tools/flink-1.20.1//lib/*:/Users/juiceyang/Tools/flink-1.20.1//conf:/Users/juiceyang/Tools/hadoop-3.1.3/etc/hadoop:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/common/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/common/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/mapreduce/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn/*
 org.apache.kyuubi.engin
 e.flink.FlinkSQLEngine \
        --conf kyuubi.session.user=anonymous \
        --conf 
flink.app.name=kyuubi_CONNECTION_FLINK_SQL_anonymous_44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
 \
        --conf hive.server2.thrift.resultset.default.fetch.size=1000 \
        --conf kyuubi.client.ipAddress=10.3.112.185 \
        --conf kyuubi.client.version=1.10.0 \
        --conf 
kyuubi.engine.appMgrInfo=eyJyZXNvdXJjZU1hbmFnZXIiOm51bGwsImt1YmVybmV0ZXNJbmZvIjp7ImNvbnRleHQiOm51bGwsIm5hbWVzcGFjZSI6bnVsbH19
 \
        --conf 
kyuubi.engine.flink.extra.classpath=/Users/juiceyang/Tools/hadoop-3.1.3/etc/hadoop:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/common/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/common/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/mapreduce/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn/*
 \
        --conf kyuubi.engine.share.level=CONNECTION \
        --conf kyuubi.engine.submit.time=1744356161084 \
        --conf kyuubi.engine.type=FLINK_SQL \
        --conf kyuubi.frontend.protocols=THRIFT_BINARY,REST \
        --conf kyuubi.ha.addresses=10.3.112.185:2181 \
        --conf kyuubi.ha.engine.ref.id=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617 \
        --conf 
kyuubi.ha.namespace=/kyuubi_1.10.1_CONNECTION_FLINK_SQL/anonymous/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
 \
        --conf kyuubi.ha.zookeeper.auth.type=NONE \
        --conf kyuubi.server.ipAddress=127.0.0.1 \
        --conf kyuubi.session.connection.url=localhost:10009 \
        --conf kyuubi.session.engine.flink.fetch.timeout=PT30S \
        --conf kyuubi.session.real.user=anonymous
   2025-04-11 15:22:41.145 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.engine.ProcBuilder: Logging to 
/Users/juiceyang/Tools/apache-kyuubi-1.10.1-bin/work/anonymous/kyuubi-flink-sql-engine.log.0
   2025-04-11 15:22:47.019 INFO NIOServerCxn.Factory:/10.3.112.185:2181 
org.apache.kyuubi.shaded.zookeeper.server.NIOServerCnxnFactory: Accepted socket 
connection from /10.3.112.185:52041
   2025-04-11 15:22:47.020 INFO NIOServerCxn.Factory:/10.3.112.185:2181 
org.apache.kyuubi.shaded.zookeeper.server.ZooKeeperServer: Client attempting to 
establish new session at /10.3.112.185:52041
   2025-04-11 15:22:47.020 INFO SyncThread:0 
org.apache.kyuubi.shaded.zookeeper.server.ZooKeeperServer: Established session 
0x1003886d20d0002 with negotiated timeout 60000 for client /10.3.112.185:52041
   2025-04-11 15:22:47.025 INFO ProcessThread(sid:0 cport:2181): 
org.apache.kyuubi.shaded.zookeeper.server.PrepRequestProcessor: Got user-level 
KeeperException when processing sessionid:0x1003886d20d0002 type:create 
cxid:0x1 zxid:0x6 txntype:-1 reqpath:n/a Error 
Path:/kyuubi_1.10.1_CONNECTION_FLINK_SQL/anonymous Error:KeeperErrorCode = 
NoNode for /kyuubi_1.10.1_CONNECTION_FLINK_SQL/anonymous
   2025-04-11 15:22:47.227 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient: Get service 
instance:10.3.112.185:52040 and version:1.10.1 under 
/kyuubi_1.10.1_CONNECTION_FLINK_SQL/anonymous/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.639 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.session.KyuubiSessionImpl: [anonymous:10.3.112.185] 
SessionHandle [44b7cd3b-e2cc-4a4b-8427-6d0d27d30617] - Connected to engine 
[10.3.112.185:52040]/[] with SessionHandle 
[44b7cd3b-e2cc-4a4b-8427-6d0d27d30617]]
   2025-04-11 15:22:47.640 INFO Curator-Framework-0 
org.apache.kyuubi.shaded.curator.framework.imps.CuratorFrameworkImpl: 
backgroundOperationsLoop exiting
   2025-04-11 15:22:47.641 INFO ProcessThread(sid:0 cport:2181): 
org.apache.kyuubi.shaded.zookeeper.server.PrepRequestProcessor: Processed 
session termination for sessionid: 0x1003886d20d0001
   2025-04-11 15:22:47.641 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.shaded.zookeeper.ZooKeeper: Session: 0x1003886d20d0001 closed
   2025-04-11 15:22:47.642 INFO NIOServerCxn.Factory:/10.3.112.185:2181 
org.apache.kyuubi.shaded.zookeeper.server.NIOServerCnxn: Closed socket 
connection for client /10.3.112.185:52033 which had sessionid 0x1003886d20d0001
   2025-04-11 15:22:47.642 INFO KyuubiSessionManager-exec-pool: 
Thread-62-EventThread org.apache.kyuubi.shaded.zookeeper.ClientCnxn: 
EventThread shut down for session: 0x1003886d20d0001
   2025-04-11 15:22:47.642 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.operation.LaunchEngine: Processing anonymous's 
query[f29903b6-d303-4011-8026-d666de85e373]: RUNNING_STATE -> FINISHED_STATE, 
time taken: 6.566 seconds
   2025-04-11 15:22:47.642 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=f29903b6-d303-4011-8026-d666de85e373      opType=LaunchEngine     
state=FINISHED  user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.685 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=74b9f4ae-858b-4ee8-b8c2-c43db5053ca4   opType=ExecuteStatement 
state=INITIALIZED       user=anonymous  
session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.686 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.operation.log.OperationLog: Creating operation log file 
/Users/juiceyang/Tools/apache-kyuubi-1.10.1-bin/work/server_operation_logs/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617/74b9f4ae-858b-4ee8-b8c2-c43db5053ca4
   2025-04-11 15:22:47.686 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.session.KyuubiSessionImpl: [anonymous:10.3.112.185] 
SessionHandle [44b7cd3b-e2cc-4a4b-8427-6d0d27d30617] - Starting to wait the 
launch engine operation finished
   2025-04-11 15:22:47.686 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.session.KyuubiSessionImpl: [anonymous:10.3.112.185] 
SessionHandle [44b7cd3b-e2cc-4a4b-8427-6d0d27d30617] - Engine has been 
launched, elapsed time: 0 s
   2025-04-11 15:22:47.686 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=74b9f4ae-858b-4ee8-b8c2-c43db5053ca4   opType=ExecuteStatement 
state=PENDING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.733 INFO KyuubiSessionManager-exec-pool: Thread-69 
org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's 
query[74b9f4ae-858b-4ee8-b8c2-c43db5053ca4]: PENDING_STATE -> RUNNING_STATE, 
statement:
   CREATE TABLE orders (
       order_number BIGINT
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1'
   );
   2025-04-11 15:22:47.734 INFO KyuubiSessionManager-exec-pool: Thread-69 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=74b9f4ae-858b-4ee8-b8c2-c43db5053ca4      opType=ExecuteStatement 
state=RUNNING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.748 INFO KyuubiSessionManager-exec-pool: Thread-69 
org.apache.kyuubi.operation.ExecuteStatement: 
Query[74b9f4ae-858b-4ee8-b8c2-c43db5053ca4] in FINISHED_STATE
   2025-04-11 15:22:47.748 INFO KyuubiSessionManager-exec-pool: Thread-69 
org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's 
query[74b9f4ae-858b-4ee8-b8c2-c43db5053ca4]: RUNNING_STATE -> FINISHED_STATE, 
time taken: 0.014 seconds
   2025-04-11 15:22:47.748 INFO KyuubiSessionManager-exec-pool: Thread-69 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=74b9f4ae-858b-4ee8-b8c2-c43db5053ca4      opType=ExecuteStatement 
state=FINISHED  user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.797 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=7cadd034-9207-4bac-9f9d-f0fbc9dc7611   opType=ExecuteStatement 
state=INITIALIZED       user=anonymous  
session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.797 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.operation.log.OperationLog: Creating operation log file 
/Users/juiceyang/Tools/apache-kyuubi-1.10.1-bin/work/server_operation_logs/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617/7cadd034-9207-4bac-9f9d-f0fbc9dc7611
   2025-04-11 15:22:47.797 INFO KyuubiTBinaryFrontendHandler-Pool: Thread-61 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=7cadd034-9207-4bac-9f9d-f0fbc9dc7611   opType=ExecuteStatement 
state=PENDING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:48.523 INFO KyuubiSessionManager-exec-pool: Thread-70 
org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's 
query[7cadd034-9207-4bac-9f9d-f0fbc9dc7611]: PENDING_STATE -> RUNNING_STATE, 
statement:
   select * from orders
   2025-04-11 15:22:48.523 INFO KyuubiSessionManager-exec-pool: Thread-70 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=7cadd034-9207-4bac-9f9d-f0fbc9dc7611      opType=ExecuteStatement 
state=RUNNING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:48.524 INFO KyuubiSessionManager-exec-pool: Thread-70 
org.apache.kyuubi.operation.ExecuteStatement: 
Query[7cadd034-9207-4bac-9f9d-f0fbc9dc7611] in FINISHED_STATE
   2025-04-11 15:22:48.524 INFO KyuubiSessionManager-exec-pool: Thread-70 
org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's 
query[7cadd034-9207-4bac-9f9d-f0fbc9dc7611]: RUNNING_STATE -> FINISHED_STATE, 
time taken: 0.001 seconds
   2025-04-11 15:22:48.524 INFO KyuubiSessionManager-exec-pool: Thread-70 
org.apache.kyuubi.operation.OperationAuditLogger: 
operation=7cadd034-9207-4bac-9f9d-f0fbc9dc7611      opType=ExecuteStatement 
state=FINISHED  user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   ```
   
   ### Kyuubi Engine Log Output
   
   ```logtalk
   2025-04-11 15:22:47.031 INFO main 
org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient: Created a 
/kyuubi_1.10.1_CONNECTION_FLINK_SQL/anonymous/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617/serverUri=10.3.112.185:52040;version=1.10.1;kyuubi.engine.appMgrInfo=eyJyZXNvdXJjZU1hbmFnZXIiOm51bGwsImt1YmVybmV0ZXNJbmZvIjp7ImNvbnRleHQiOm51bGwsIm5hbWVzcGFjZSI6bnVsbH19;refId=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617;sequence=0000000000
 on ZooKeeper for KyuubiServer uri: 10.3.112.185:52040
   2025-04-11 15:22:47.032 INFO main 
org.apache.kyuubi.ha.client.EngineServiceDiscovery: Registered 
EngineServiceDiscovery in namespace 
/kyuubi_1.10.1_CONNECTION_FLINK_SQL/anonymous/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617.
   2025-04-11 15:22:47.032 INFO main 
org.apache.kyuubi.ha.client.EngineServiceDiscovery: 
Service[EngineServiceDiscovery] is started.
   2025-04-11 15:22:47.032 INFO main 
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService: 
Service[FlinkThriftBinaryFrontendService] is started.
   2025-04-11 15:22:47.032 INFO main 
org.apache.kyuubi.engine.flink.FlinkSQLEngine: Service[FlinkSQLEngine] is 
started.
   2025-04-11 15:22:47.040 INFO main 
org.apache.kyuubi.engine.flink.FlinkSQLEngine: Flink engine started
   2025-04-11 15:22:47.244 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService: Client 
protocol version: HIVE_CLI_SERVICE_PROTOCOL_V10
   2025-04-11 15:22:47.246 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager: 
Opening session for anonymous@10.3.112.185
   2025-04-11 15:22:47.278 WARN FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil: 
Workflow scheduler options do not contain an option key 
'workflow-scheduler.type' for discovering an workflow scheduler.
   2025-04-11 15:22:47.279 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.flink.table.gateway.service.session.SessionManagerImpl: 
Session 1b093bb3-96ee-46fa-94c1-83330623727a is opened, and the number of 
current sessions is 1.
   2025-04-11 15:22:47,449 FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 INFO Log4j appears to be running in a Servlet environment, but 
there's no log4j-web module available. If you want better web container 
support, please add the log4j-web JAR to your web archive or server lib 
directory.
   2025-04-11 15:22:47,456 FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 WARN No Log4j 2 configuration file found. Using default configuration 
(logging only errors to the console), or user programmatically provided 
configurations. Set system property 'log4j2.debug' to show Log4j 2 internal 
initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions 
on how to configure Log4j 2
   2025-04-11 15:22:47,456 FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 INFO Log4j appears to be running in a Servlet environment, but 
there's no log4j-web module available. If you want better web container 
support, please add the log4j-web JAR to your web archive or server lib 
directory.
   2025-04-11 15:22:47.613 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlCreateCatalog does not contain a setter for 
field catalogName
   2025-04-11 15:22:47.613 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlCreateCatalog cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions does not contain a 
setter for field propertyList
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset does not contain a getter 
for field propertyKeyList
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset does not contain a setter 
for field propertyKeyList
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment does not contain a 
setter for field comment
   2025-04-11 15:22:47.614 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.615 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlCreateView does not contain a setter for 
field viewName
   2025-04-11 15:22:47.615 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlCreateView cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.615 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterViewRename does not contain a getter 
for field newViewIdentifier
   2025-04-11 15:22:47.615 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterViewRename does not contain a setter 
for field newViewIdentifier
   2025-04-11 15:22:47.616 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterViewRename cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.616 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterViewProperties does not contain a 
setter for field propertyList
   2025-04-11 15:22:47.616 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterViewProperties cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.616 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterViewAs does not contain a setter for 
field newQuery
   2025-04-11 15:22:47.616 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterViewAs cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.617 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAddPartitions does not contain a setter for 
field ifPartitionNotExists
   2025-04-11 15:22:47.617 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAddPartitions cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.617 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlDropPartitions does not contain a setter for 
field ifExists
   2025-04-11 15:22:47.617 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlDropPartitions cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowPartitions does not contain a getter for 
field tableIdentifier
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowPartitions does not contain a setter for 
field tableIdentifier
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlShowPartitions cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dml.SqlTruncateTable does not contain a getter for 
field tableNameIdentifier
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dml.SqlTruncateTable does not contain a setter for 
field tableNameIdentifier
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dml.SqlTruncateTable cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowFunctions does not contain a setter for 
field requireUser
   2025-04-11 15:22:47.618 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlShowFunctions cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.619 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowProcedures does not contain a getter for 
field databaseName
   2025-04-11 15:22:47.619 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowProcedures does not contain a setter for 
field databaseName
   2025-04-11 15:22:47.619 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlShowProcedures cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.619 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlReplaceTableAs does not contain a setter for 
field tableName
   2025-04-11 15:22:47.619 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlReplaceTableAs cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowDatabases does not contain a setter for 
field preposition
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlShowDatabases cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowCreateCatalog does not contain a getter 
for field sqlIdentifier
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlShowCreateCatalog does not contain a setter 
for field sqlIdentifier
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlShowCreateCatalog cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlDescribeCatalog does not contain a getter 
for field catalogName
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlDescribeCatalog does not contain a setter 
for field catalogName
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlDescribeCatalog cannot be used as a POJO 
type because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlDescribeJob does not contain a getter for 
field jobId
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.dql.SqlDescribeJob does not contain a setter for 
field jobId
   2025-04-11 15:22:47.620 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.dql.SqlDescribeJob cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable does not contain a 
setter for field tableName
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable cannot be used as a 
POJO type because not all fields are valid POJO fields, and must be processed 
as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh does not 
contain a setter for field partitionSpec
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh cannot be used 
as a POJO type because not all fields are valid POJO fields, and must be 
processed as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend does not 
contain a getter for field tableIdentifier
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend does not 
contain a setter for field tableIdentifier
   2025-04-11 15:22:47.621 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend cannot be used 
as a POJO type because not all fields are valid POJO fields, and must be 
processed as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.622 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume does not 
contain a setter for field propertyList
   2025-04-11 15:22:47.622 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume cannot be used 
as a POJO type because not all fields are valid POJO fields, and must be 
processed as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.622 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable does not contain a 
getter for field tableIdentifier
   2025-04-11 15:22:47.622 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: class 
org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable does not contain a 
setter for field tableIdentifier
   2025-04-11 15:22:47.622 INFO main 
org.apache.flink.api.java.typeutils.TypeExtractor: Class class 
org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable cannot be used as a 
POJO type because not all fields are valid POJO fields, and must be processed 
as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2025-04-11 15:22:47.625 INFO main 
org.apache.kyuubi.engine.flink.FlinkSQLEngine: Bootstrap SQL finished.
   2025-04-11 15:22:47.635 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager: 
anonymous's FlinkSessionImpl with SessionHandle 
[44b7cd3b-e2cc-4a4b-8427-6d0d27d30617] is opened, current opening sessions 1
   2025-04-11 15:22:47.700 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=90c8ecf8-0129-43cb-99db-3be7da880782        opType=ExecuteStatement 
state=INITIALIZED       user=anonymous  
session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.701 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.log.OperationLog: Creating operation log 
file 
/Users/juiceyang/Tools/apache-kyuubi-1.10.1-bin/work/engine_operation_logs/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617/90c8ecf8-0129-43cb-99db-3be7da880782
   2025-04-11 15:22:47.701 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=90c8ecf8-0129-43cb-99db-3be7da880782        opType=ExecuteStatement 
state=PENDING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.702 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing 
anonymous's query[90c8ecf8-0129-43cb-99db-3be7da880782]: PENDING_STATE -> 
RUNNING_STATE, statement:
   CREATE TABLE orders (
       order_number BIGINT
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1'
   );
   2025-04-11 15:22:47.702 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=90c8ecf8-0129-43cb-99db-3be7da880782        opType=ExecuteStatement 
state=RUNNING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.731 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing 
anonymous's query[90c8ecf8-0129-43cb-99db-3be7da880782]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 0.029 seconds
   2025-04-11 15:22:47.731 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=90c8ecf8-0129-43cb-99db-3be7da880782        opType=ExecuteStatement 
state=FINISHED  user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.801 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=e6da332f-5f63-44dc-b209-d52267a907f5        opType=ExecuteStatement 
state=INITIALIZED       user=anonymous  
session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.801 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.log.OperationLog: Creating operation log 
file 
/Users/juiceyang/Tools/apache-kyuubi-1.10.1-bin/work/engine_operation_logs/44b7cd3b-e2cc-4a4b-8427-6d0d27d30617/e6da332f-5f63-44dc-b209-d52267a907f5
   2025-04-11 15:22:47.801 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=e6da332f-5f63-44dc-b209-d52267a907f5        opType=ExecuteStatement 
state=PENDING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:47.801 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing 
anonymous's query[e6da332f-5f63-44dc-b209-d52267a907f5]: PENDING_STATE -> 
RUNNING_STATE, statement:
   select * from orders
   2025-04-11 15:22:47.802 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=e6da332f-5f63-44dc-b209-d52267a907f5        opType=ExecuteStatement 
state=RUNNING   user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   2025-04-11 15:22:48.400 INFO Flink-RestClusterClient-IO-thread-1 
org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'SELECT 
`orders`.`order_number`
   FROM `default_catalog`.`default_database`.`orders` AS `orders`' 
(5eb98ff0d40172b2f2b39c285af3980e).
   2025-04-11 15:22:48.511 INFO Flink-RestClusterClient-IO-thread-4 
org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted 
job 'SELECT `orders`.`order_number`
   FROM `default_catalog`.`default_database`.`orders` AS `orders`' 
(5eb98ff0d40172b2f2b39c285af3980e) to 'http://localhost:8081'.
   2025-04-11 15:22:48.522 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing 
anonymous's query[e6da332f-5f63-44dc-b209-d52267a907f5]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 0.72 seconds
   2025-04-11 15:22:48.522 INFO FlinkThriftBinaryFrontendServiceHandler-Pool: 
Thread-20 org.apache.kyuubi.operation.OperationAuditLogger: 
operation=e6da332f-5f63-44dc-b209-d52267a907f5        opType=ExecuteStatement 
state=FINISHED  user=anonymous  session=44b7cd3b-e2cc-4a4b-8427-6d0d27d30617
   ```
   
   ### Kyuubi Server Configurations
   
   ```yaml
   kyuubi.frontend.bind.host                localhost
   kyuubi.frontend.protocols                THRIFT_BINARY,REST
   kyuubi.frontend.thrift.binary.bind.port  10009
   kyuubi.frontend.rest.bind.port           10099
   kyuubi.engine.flink.extra.classpath      
/Users/juiceyang/Tools/hadoop-3.1.3/etc/hadoop:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/common/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/common/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/hdfs/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/mapreduce/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn/lib/*:/Users/juiceyang/Tools/hadoop-3.1.3/share/hadoop/yarn/*
   ```
   
   ### Kyuubi Engine Configurations
   
   ```yaml
   
################################################################################
   #  Licensed to the Apache Software Foundation (ASF) under one
   #  or more contributor license agreements.  See the NOTICE file
   #  distributed with this work for additional information
   #  regarding copyright ownership.  The ASF licenses this file
   #  to you under the Apache License, Version 2.0 (the
   #  "License"); you may not use this file except in compliance
   #  with the License.  You may obtain a copy of the License at
   #
   #      http://www.apache.org/licenses/LICENSE-2.0
   #
   #  Unless required by applicable law or agreed to in writing, software
   #  distributed under the License is distributed on an "AS IS" BASIS,
   #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   #  See the License for the specific language governing permissions and
   #  limitations under the License.
   
################################################################################
   
   # These parameters are required for Java 17 support.
   # They can be safely removed when using Java 8/11.
   env:
     java:
       opts:
         all: --add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.text=ALL-UNNAMED 
--add-opens=java.base/java.time=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.
 atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
   
   
#==============================================================================
   # Common
   
#==============================================================================
   
   jobmanager:
     # The host interface the JobManager will bind to. By default, this is 
localhost, and will prevent
     # the JobManager from communicating outside the machine/container it is 
running on.
     # On YARN this setting will be ignored if it is set to 'localhost', 
defaulting to 0.0.0.0.
     # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
     #
     # To enable this, set the bind-host address to one that has access to an 
outside facing network
     # interface, such as 0.0.0.0.
     bind-host: localhost
     rpc:
       # The external address of the host on which the JobManager runs and can 
be
       # reached by the TaskManagers and any clients which want to connect. 
This setting
       # is only used in Standalone mode and may be overwritten on the 
JobManager side
       # by specifying the --host <hostname> parameter of the bin/jobmanager.sh 
executable.
       # In high availability mode, if you use the bin/start-cluster.sh script 
and setup
       # the conf/masters file, this will be taken care of automatically. Yarn
       # automatically configure the host name based on the hostname of the 
node where the
       # JobManager runs.
       address: localhost
       # The RPC port where the JobManager is reachable.
       port: 6123
     memory:
       process:
         # The total process memory size for the JobManager.
         # Note this accounts for all memory usage within the JobManager 
process, including JVM metaspace and other overhead.
         size: 1600m
     execution:
       # The failover strategy, i.e., how the job computation recovers from 
task failures.
       # Only restart tasks that may have been affected by the task failure, 
which typically includes
       # downstream tasks and potentially upstream tasks if their produced data 
is no longer available for consumption.
       failover-strategy: region
   
   taskmanager:
     # The host interface the TaskManager will bind to. By default, this is 
localhost, and will prevent
     # the TaskManager from communicating outside the machine/container it is 
running on.
     # On YARN this setting will be ignored if it is set to 'localhost', 
defaulting to 0.0.0.0.
     # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
     #
     # To enable this, set the bind-host address to one that has access to an 
outside facing network
     # interface, such as 0.0.0.0.
     bind-host: localhost
     # The address of the host on which the TaskManager runs and can be reached 
by the JobManager and
     # other TaskManagers. If not specified, the TaskManager will try different 
strategies to identify
     # the address.
     #
     # Note this address needs to be reachable by the JobManager and forward 
traffic to one of
     # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
     #
     # Note also that unless all TaskManagers are running on the same machine, 
this address needs to be
     # configured separately for each TaskManager.
     host: localhost
     # The number of task slots that each TaskManager offers. Each slot runs 
one parallel pipeline.
     numberOfTaskSlots: 1
     memory:
       process:
         # The total process memory size for the TaskManager.
         #
         # Note this accounts for all memory usage within the TaskManager 
process, including JVM metaspace and other overhead.
         # To exclude JVM metaspace and overhead, please, use total Flink 
memory size instead of 'taskmanager.memory.process.size'.
         # It is not recommended to set both 'taskmanager.memory.process.size' 
and Flink memory.
         size: 1728m
   
   parallelism:
     # The parallelism used for programs that did not specify and other 
parallelism.
     default: 1
   
   # # The default file system scheme and authority.
   # # By default file paths without scheme are interpreted relative to the 
local
   # # root file system 'file:///'. Use this to override the default and 
interpret
   # # relative paths relative to a different file system,
   # # for example 'hdfs://mynamenode:12345'
   # fs:
   #   default-scheme: hdfs://mynamenode:12345
   
   
#==============================================================================
   # High Availability
   
#==============================================================================
   
   # high-availability:
   #   # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
   #   type: zookeeper
   #   # The path where metadata for master recovery is persisted. While 
ZooKeeper stores
   #   # the small ground truth for checkpoint and leader election, this 
location stores
   #   # the larger objects, like persisted dataflow graphs.
   #   #
   #   # Must be a durable file system that is accessible from all nodes
   #   # (like HDFS, S3, Ceph, nfs, ...)
   #   storageDir: hdfs:///flink/ha/
   #   zookeeper:
   #     # The list of ZooKeeper quorum peers that coordinate the 
high-availability
   #     # setup. This must be a list of the form:
   #     # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
   #     quorum: localhost:2181
   #     client:
   #       # ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
   #       # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
   #       # The default value is "open" and it can be changed to "creator" if 
ZK security is enabled
   #       acl: open
   
   
#==============================================================================
   # Fault tolerance and checkpointing
   
#==============================================================================
   
   # The backend that will be used to store operator state checkpoints if
   # checkpointing is enabled. Checkpointing is enabled when 
execution.checkpointing.interval > 0.
   
   # # Execution checkpointing related parameters. Please refer to 
CheckpointConfig and CheckpointingOptions for more details.
   # execution:
   #   checkpointing:
   #     interval: 3min
   #     externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, 
RETAIN_ON_CANCELLATION]
   #     max-concurrent-checkpoints: 1
   #     min-pause: 0
   #     mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
   #     timeout: 10min
   #     tolerable-failed-checkpoints: 0
   #     unaligned: false
   
   # state:
   #   backend:
   #     # Supported backends are 'hashmap', 'rocksdb', or the
   #     # <class-name-of-factory>.
   #     type: hashmap
   #     # Flag to enable/disable incremental checkpoints for backends that
   #     # support incremental checkpoints (like the RocksDB state backend).
   #     incremental: false
   #   checkpoints:
   #       # Directory for checkpoints filesystem, when using any of the 
default bundled
   #       # state backends.
   #       dir: hdfs://namenode-host:port/flink-checkpoints
   #   savepoints:
   #       # Default target directory for savepoints, optional.
   #       dir: hdfs://namenode-host:port/flink-savepoints
   
   
#==============================================================================
   # Rest & web frontend
   
#==============================================================================
   
   rest:
     # The address to which the REST client will connect to
     address: localhost
     # The address that the REST & web server binds to
     # By default, this is localhost, which prevents the REST & web server from
     # being able to communicate outside of the machine/container it is running 
on.
     #
     # To enable this, set the bind address to one that has access to 
outside-facing
     # network interface, such as 0.0.0.0.
     bind-address: localhost
     # # The port to which the REST client connects to. If rest.bind-port has
     # # not been specified, then the server will bind to this port as well.
     # port: 8081
     # # Port range for the REST and web server to bind to.
     # bind-port: 8080-8090
   
   # web:
   #   submit:
   #     # Flag to specify whether job submission is enabled from the web-based
   #     # runtime monitor. Uncomment to disable.
   #     enable: false
   #   cancel:
   #     # Flag to specify whether job cancellation is enabled from the 
web-based
   #     # runtime monitor. Uncomment to disable.
   #     enable: false
   
   
#==============================================================================
   # Advanced
   
#==============================================================================
   
   # io:
   #   tmp:
   #     # Override the directories for temporary files. If not specified, the
   #     # system-specific Java temporary directory (java.io.tmpdir property) 
is taken.
   #     #
   #     # For framework setups on Yarn, Flink will automatically pick up the
   #     # containers' temp directories without any need for configuration.
   #     #
   #     # Add a delimited list for multiple directories, using the system 
directory
   #     # delimiter (colon ':' on unix) or a comma, e.g.:
   #     # /data1/tmp:/data2/tmp:/data3/tmp
   #     #
   #     # Note: Each directory entry is read from and written to by a 
different I/O
   #     # thread. You can include the same directory multiple times in order 
to create
   #     # multiple I/O threads against that directory. This is for example 
relevant for
   #     # high-throughput RAIDs.
   #     dirs: /tmp
   
   # classloader:
   #   resolve:
   #     # The classloading resolve order. Possible values are 'child-first' 
(Flink's default)
   #     # and 'parent-first' (Java's default).
   #     #
   #     # Child first classloading allows users to use different 
dependency/library
   #     # versions in their application than those in the classpath. Switching 
back
   #     # to 'parent-first' may help with debugging dependency issues.
   #     order: child-first
   
   # The amount of memory going to the network stack. These numbers usually need
   # no tuning. Adjusting them may be necessary in case of an "Insufficient 
number
   # of network buffers" error. The default min is 64MB, the default max is 1GB.
   #
   # taskmanager:
   #   memory:
   #     network:
   #       fraction: 0.1
   #       min: 64mb
   #       max: 1gb
   
   
#==============================================================================
   # Flink Cluster Security Configuration
   
#==============================================================================
   
   # Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
   # may be enabled in four steps:
   # 1. configure the local krb5.conf file
   # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ 
kinit)
   # 3. make the credentials available to various JAAS login contexts
   # 4. configure the connector to use JAAS/SASL
   
   # # The below configure how Kerberos credentials are provided. A keytab will 
be used instead of
   # # a ticket cache if the keytab path and principal are set.
   # security:
   #   kerberos:
   #     login:
   #       use-ticket-cache: true
   #       keytab: /path/to/kerberos/keytab
   #       principal: flink-user
   #       # The configuration below defines which JAAS login contexts
   #       contexts: Client,KafkaClient
   
   
#==============================================================================
   # ZK Security Configuration
   
#==============================================================================
   
   # zookeeper:
   #   sasl:
   #     # Below configurations are applicable if ZK ensemble is configured for 
security
   #     #
   #     # Override below configuration to provide custom ZK service name if 
configured
   #     # zookeeper.sasl.service-name: zookeeper
   #     #
   #     # The configuration below must match one of the values set in 
"security.kerberos.login.contexts"
   #     login-context-name: Client
   
   
#==============================================================================
   # HistoryServer
   
#==============================================================================
   
   # The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
   #
   # jobmanager:
   #   archive:
   #     fs:
   #       # Directory to upload completed jobs to. Add this directory to the 
list of
   #       # monitored directories of the HistoryServer as well (see below).
   #       dir: hdfs:///completed-jobs/
   
   # historyserver:
   #   web:
   #     # The address under which the web-based HistoryServer listens.
   #     address: 0.0.0.0
   #     # The port under which the web-based HistoryServer listens.
   #     port: 8082
   #   archive:
   #     fs:
   #       # Comma separated list of directories to monitor for completed jobs.
   #       dir: hdfs:///completed-jobs/
   #       # Interval in milliseconds for refreshing the monitored directories.
   #       fs.refresh-interval: 10000
   ```
   
   ### Additional context
   
   Currently the fetch timeout [is throw inside each 
resultSet.getData.next()](https://github.com/apache/kyuubi/blob/9047151d8dd24cb5834770c9b216438838b7f9ed/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala#L128).
   
   I assume it's more intuitive to throw timeout exception when the while loop 
costs more time than timeout setting like below. (Sorry I can't write Scala so 
there might be syntax error)
   ```java
   try {
     // there could be null values at the end of the batch
     // because Flink could return an EOS
     var rows = 0
     val future = Future(() - > {
       while (resultSet.getData.hasNext && rows < rowSetSize) {
         Option(resultSet.getData.next()).foreach { r => batch += r; rows += 1 }
       }
     })
     Await.result(future, fetchTimeout)
   } catch {
       case e: TimeoutException =>
     // ignore and return the current batch if there's some data
     // otherwise, rethrow the timeout exception
     if (batch.nonEmpty) {
       debug(s"Timeout fetching more data for $opType operation. " +
     s"Returning the current fetched data.")
     } else {
       throw e
     }
   }
   ```
   
   ### Are you willing to submit PR?
   
   - [x] Yes. I would be willing to submit a PR with guidance from the Kyuubi 
community to fix.
   - [ ] No. I cannot submit a PR at this time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org
For additional commands, e-mail: notifications-h...@kyuubi.apache.org

Reply via email to