Hello,
我遇到了类似https://www.mail-archive.com/[email protected]/msg03916.html
中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。
public static void main (String[] args) throws Exception {
// set up the streaming execution environment
ClientConfig clientConfig =
ClientConfig.builder().controllerURI(URI.create("tcp://192.168.188.130:9090")).build();
StreamManager streamManager = StreamManager.create(clientConfig);
streamManager.createStream("Demo",
"result",StreamConfiguration.builder().build());
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
EnvironmentSettings envSetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env, envSetting);
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT,
datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts
AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts -
INTERVAL '5' SECOND)" +
" WITH (" +
"'connector.type' = 'pravega'," +
"'connector.version' = '1'," +
"'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090'," +
"'connector.connection-config.default-scope' = 'Demo'," +
"'connector.reader.stream-info.0.stream' = 'test'," +
"'format.type' = 'json'," +
"'format.fail-on-missing-field' = 'false', " +
"'update-mode' = 'append')";
tableEnv.sqlUpdate(sqlDdlAnaTable); String
sqlDdlSinkTable = "CREATE TABLE tb_sink" +
"(id STRING, " +
"wStart TIMESTAMP(3) , " +
"v FLOAT)" +
" WITH (" +
"'connector.type' = 'pravega'," +
"'connector.version' = '1'," +
"'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090'," +
"'connector.connection-config.default-scope' = 'Demo'," +
"'connector.writer.stream' = 'result'," +
"'connector.writer.routingkey-field-name' = 'id'," +
"'connector.writer.mode' = 'atleast_once'," +
"'format.type' = 'json'," +
"'update-mode' = 'append')";
tableEnv.sqlUpdate(sqlDdlSinkTable); String sqlJson =
"SELECT ts, type, l.id AS id, l.v AS v, l.q AS q " +
"FROM ana_Source " +
"CROSS JOIN UNNEST(list) as l (id,v,q)";
Table tableJsonRecord = tableEnv.sqlQuery(sqlJson);
tableEnv.registerTable("tb_JsonRecord", tableJsonRecord);
System.out.println("------------------print {}
schema------------------" + "tb_JsonRecord");
tableJsonRecord.printSchema();
//tableEnv.toAppendStream(tableRecord, Row.class).print();
String sqlAna = "SELECT ts, id, v " +
"FROM tb_JsonRecord " +
"WHERE q=1 AND type=1";
Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);
tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);
System.out.println("------------------print {}
schema------------------" + "tb_AnaRecord");
tableAnaRecord.printSchema();
//tableEnv.toAppendStream(tableAnaRecord, Row.class).print();
String sqlAnaAvg = "SELECT id, " +
"TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " +
"AVG(v) FROM tb_AnaRecord " +
"GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
Table tableAvgRecord = tableEnv.sqlQuery(sqlAnaAvg);
tableEnv.registerTable("tb_AvgRecord", tableAvgRecord);
System.out.println("------------------print {}
schema------------------" + "tb_AvgRecord");
tableAvgRecord.printSchema();
tableEnv.toAppendStream(tableAvgRecord, Row.class).print();/*
String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +
"SELECT id, " +
"TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " +
"AVG(v) FROM tb_AnaRecord " +
"GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
tableEnv.sqlUpdate(sqlAnaAvg);*/ tableEnv.execute("Streaming
Job");