sql ????:
String sql1="CREATE TABLE detal (\n" +
" id INT,\n" +
" produceId VARCHAR,\n"+
" color VARCHAR,\n"+
" size VARCHAR,\n"+
" PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
" 'username' = 'ark_admin', \n" +
" 'password' = 'ark_Admin@0927Da', \n" +
" 'table-name' = 'detal'\n" +
")\n";
tenv.executeSql(sql1).print();
String sql2 = "SELECT * FROM detal";
tenv.executeSql(sql2).print();
String sql3="CREATE TABLE shangping (\n" +
" id INT,\n" +
" orderId INT,\n"+
" produceId VARCHAR,\n"+
" PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
" 'username' = 'ark_admin', \n" +
" 'password' = 'ark_Admin@0927Da', \n" +
" 'table-name' = 'shangping'\n" +
")\n";
tenv.executeSql(sql3).print();
String sql4 = "SELECT * FROM shangping";
tenv.executeSql(sql4).print();
String sql5="CREATE TABLE new_table (\n" +
" id INT,\n" +
" orderId INT,\n"+
" produceId VARCHAR,\n"+
" color VARCHAR,\n"+
" size VARCHAR\n"+
// " PRIMARY KEY (id) NOT ENFORCED\n"+
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
" 'username' = 'ark_admin', \n" +
" 'password' = 'ark_Admin@0927Da', \n" +
" 'table-name' = 'new_table'\n" +
")\n";
tenv.executeSql(sql5).print();
String sql6 = "SELECT * FROM new_table";
tenv.executeSql(sql6).print();
String insertSql = "insert into new_table " +
"select * " +
"from detal";
????:??????????????sink????????????????????
????:
java.lang.IllegalArgumentException: open() failed.Table 'test_1.new_table'
doesn't exist
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:215)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table
'test_1.new_table' doesn't exist
at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
at com.mysql.jdbc.Util.getInstance(Util.java:381)
at
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1030)
at
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3558)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3490)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1959)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2109)
at
com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2643)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2077)
at
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2228)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:212)
... 4 common frames omitted
??