Paul Wu created FLINK-8356:
------------------------------

             Summary: JDBCAppendTableSink does not work for Hbase Phoenix 
Driver 
                 Key: FLINK-8356
                 URL: https://issues.apache.org/jira/browse/FLINK-8356
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.4.0
            Reporter: Paul Wu


The following code runs without errors, but the data is not inserted into the 
HBase table. However, it does work for MySQL (see the commented out code). The 
Phoenix driver is from 
https://mvnrepository.com/artifact/org.apache.phoenix/phoenix/4.7.0-HBase-1.1

String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE 
SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei from ts 
";
        
        Table table = ste.sqlQuery(query);
        JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
        jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver");
        jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure");
        jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA 
(SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
//     JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
//        jdbc.setDrivername("com.mysql.jdbc.Driver");
//        jdbc.setDBUrl("jdbc:mysql://localhost/test");
//        jdbc.setUsername("root").setPassword("");
//        jdbc.setQuery("insert INTO GEO_ANALYTICS_STREAMING_DATA 
(SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
//        jdbc.setBatchSize(1);
        jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE, Types.STRING, 
Types.STRING);
        JDBCAppendTableSink sink = jdbc.build();
        table.writeToSink(sink);



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to