sql ????:
String sql1="CREATE TABLE detal (\n" +
        "   id INT,\n" +
        "  produceId VARCHAR,\n"+
        "  color VARCHAR,\n"+
        "  size VARCHAR,\n"+
        "  PRIMARY KEY (id) NOT ENFORCED\n"+
        ") WITH (\n" +
        "  'connector' = 'jdbc',\n" +
        "  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
        "  'username' = 'ark_admin', \n" +
        "  'password' = 'ark_Admin@0927Da', \n" +
        "  'table-name' = 'detal'\n" +
        ")\n";
tenv.executeSql(sql1).print();

String sql2 = "SELECT * FROM detal";
tenv.executeSql(sql2).print();

String sql3="CREATE TABLE shangping (\n" +
        "   id INT,\n" +
        "  orderId INT,\n"+
        "  produceId VARCHAR,\n"+
        "  PRIMARY KEY (id) NOT ENFORCED\n"+
        ") WITH (\n" +
        "  'connector' = 'jdbc',\n" +
        "  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
        "  'username' = 'ark_admin', \n" +
        "  'password' = 'ark_Admin@0927Da', \n" +
        "  'table-name' = 'shangping'\n" +
        ")\n";
tenv.executeSql(sql3).print();

String sql4 = "SELECT * FROM shangping";
tenv.executeSql(sql4).print();

String sql5="CREATE TABLE new_table (\n" +
        "   id INT,\n" +
        "  orderId INT,\n"+
        "  produceId VARCHAR,\n"+
        "  color VARCHAR,\n"+
        "  size VARCHAR\n"+
       // "  PRIMARY KEY (id) NOT ENFORCED\n"+
        ") WITH (\n" +
        "  'connector' = 'jdbc',\n" +
        "  'url' = 'jdbc:mysql://ark1:3306/test_1', \n" +
        "  'username' = 'ark_admin', \n" +
        "  'password' = 'ark_Admin@0927Da', \n" +
        "  'table-name' = 'new_table'\n" +
        ")\n";
tenv.executeSql(sql5).print();
String sql6 = "SELECT * FROM new_table";

tenv.executeSql(sql6).print();
String insertSql = "insert into new_table " +
        "select * " +
        "from detal";


????:??????????????sink????????????????????










????:

java.lang.IllegalArgumentException: open() failed.Table 'test_1.new_table' 
doesn't exist
    at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:215)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 
'test_1.new_table' doesn't exist
    at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
    at com.mysql.jdbc.Util.getInstance(Util.java:381)
    at 
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1030)
    at 
com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3558)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3490)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1959)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2109)
    at 
com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2643)
    at 
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2077)
    at 
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2228)
    at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:212)
    ... 4 common frames omitted


??


 

回复