Hello,

我遇到了类似https://www.mail-archive.com/[email protected]/msg03916.html
中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。

public static void main (String[] args) throws Exception {
        // set up the streaming execution environment
ClientConfig clientConfig =
ClientConfig.builder().controllerURI(URI.create("tcp://192.168.188.130:9090")).build();
        StreamManager streamManager = StreamManager.create(clientConfig);
        streamManager.createStream("Demo",
"result",StreamConfiguration.builder().build());
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        EnvironmentSettings envSetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
       StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env, envSetting);
        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
       String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT,
datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts
AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts -
INTERVAL '5' SECOND)" +
                " WITH (" +
                "'connector.type' = 'pravega'," +
                "'connector.version' = '1'," +
                "'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090'," +
                "'connector.connection-config.default-scope' = 'Demo'," +
                "'connector.reader.stream-info.0.stream' = 'test'," +
                "'format.type' = 'json'," +
                "'format.fail-on-missing-field' = 'false', " +
                "'update-mode' = 'append')";
        tableEnv.sqlUpdate(sqlDdlAnaTable);        String
sqlDdlSinkTable = "CREATE TABLE tb_sink" +
                "(id STRING, " +
                "wStart TIMESTAMP(3) , " +
                "v FLOAT)" +
                " WITH (" +
                "'connector.type' = 'pravega'," +
                "'connector.version' = '1'," +
                "'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090'," +
                "'connector.connection-config.default-scope' = 'Demo'," +
                "'connector.writer.stream' = 'result'," +
                "'connector.writer.routingkey-field-name' = 'id'," +
                "'connector.writer.mode' = 'atleast_once'," +
                "'format.type' = 'json'," +
                "'update-mode' = 'append')";
        tableEnv.sqlUpdate(sqlDdlSinkTable);        String sqlJson =
"SELECT ts, type, l.id AS id, l.v AS v, l.q AS q " +
                "FROM ana_Source " +
                "CROSS JOIN UNNEST(list) as l (id,v,q)";
        Table tableJsonRecord = tableEnv.sqlQuery(sqlJson);
        tableEnv.registerTable("tb_JsonRecord", tableJsonRecord);
        System.out.println("------------------print {}
schema------------------" + "tb_JsonRecord");
        tableJsonRecord.printSchema();
        //tableEnv.toAppendStream(tableRecord, Row.class).print();
   String sqlAna = "SELECT ts, id, v " +
                "FROM tb_JsonRecord " +
                "WHERE q=1 AND type=1";
        Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);
        tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);
        System.out.println("------------------print {}
schema------------------" + "tb_AnaRecord");
        tableAnaRecord.printSchema();
        //tableEnv.toAppendStream(tableAnaRecord, Row.class).print();
      String sqlAnaAvg = "SELECT id, " +
                "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +
                "AVG(v) FROM tb_AnaRecord " +
                "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
        Table tableAvgRecord = tableEnv.sqlQuery(sqlAnaAvg);
        tableEnv.registerTable("tb_AvgRecord", tableAvgRecord);
        System.out.println("------------------print {}
schema------------------" + "tb_AvgRecord");
        tableAvgRecord.printSchema();
        tableEnv.toAppendStream(tableAvgRecord, Row.class).print();/*
        String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +
                "SELECT id, " +
                "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +
                "AVG(v) FROM tb_AnaRecord " +
                "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
 tableEnv.sqlUpdate(sqlAnaAvg);*/        tableEnv.execute("Streaming
Job");

回复