??????
??????????????
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);
DataStream<Row> rowDataStreamSource = bsEnv.addSource(new
SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> sourceContext) throws Exception {
while (true) {
Thread.sleep(1000);
Row row = new Row(RowKind.INSERT, 2);
row.setField(0, "a");
row.setField(1, new Timestamp(System.currentTimeMillis()));
sourceContext.collect(row);
}
}
@Override
public void cancel() {
}
})
.returns(new RowTypeInfo(new TypeInformation[]{Types.STRING,
Types.SQL_TIMESTAMP}))
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
Table table = bsTableEnv.fromDataStream(rowDataStreamSource, $("a"),
$("b").rowtime());
table.execute().print();??????????$("b").rowtime()????????????????rowtime????????table????????????????????????????????|
+I | a | +1705471-09-26T16:47... | | +I |
a | +1705471-09-26T16:47... | | +I |
a | +1705471-09-26T16:47...
|????????????????????????????????????????????rowtime????????????long??timestamp????????LocalDateTime??????????????????????????????LocalDateTime??????