?????? ??flink
1.12??????????flink????????????????????????????????DataStream
API,??????????????????????RuntimeExecutionMode.BATCH??????????????????
????????????????????????????????????????????????????????????????????????????????????????????????????
package com.meritdata.cloud.tempo.dw.flink.test.bug;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class JDBCTest {
public static void main(String[] args) {
test();
/**
* ??????????
* EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
* .useBlinkPlanner().inBatchMode().build();
* TableEnvironment bbTableEnv =
TableEnvironment.create(bbSettings);
* +--------------------------------+----------------------+
* | a | EXPR$1 |
* +--------------------------------+----------------------+
* | 2 | 1 |
* | 3 | 2 |
* | 1 | 2 |
* | 4 | 1 |
* +--------------------------------+----------------------+
*/
// test1();
/**
* ??????API????????
* StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
* streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
* StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv);
* +----+--------------------------------+----------------------+
* | op | a | EXPR$1 |
* +----+--------------------------------+----------------------+
* | +I | 2 | 1 |
* | +I | 1 | 1 |
* | +I | 4 | 1 |
* | -U | 1 | 1 |
* | +U | 1 | 2 |
* | +I | 3 | 1 |
* | -U | 3 | 1 |
* | +U | 3 | 2 |
* +----+--------------------------------+----------------------+
*/
}
public static void test() {
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
bbTableEnv.executeSql("CREATE TABLE ab (" +
" a STRING, " +
" b INT " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' =
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'table-name' = 'ab'" +
" )");
bbTableEnv.sqlQuery("select a, count(b) from ab group by
a").execute().print();
}
public static void test1() {
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv);
tableEnv.executeSql("CREATE TABLE ab (" +
" a STRING, " +
" b INT " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' =
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'table-name' = 'ab'" +
" )");
tableEnv.sqlQuery("select a, count(b) from ab group by
a").execute().print();
}
}