Hi Yunfeng,
by default the fromDataStream does not propagate watermarks into Table
API. Because Table API needs a time attribute in the schema that
corresponds to the watermarking. A time attribute will also put back
into the stream record during toDataStream.
Please take a look at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
Esp. example 4 should solve your use case:
// === EXAMPLE 4 ===
// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime
attribute column
// also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for `dataStream`
before
// (not part of this example)
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
I hope this helps.
Regards,
Timo
On 04.11.21 12:00, Yunfeng Zhou wrote:
Hi,
I found that if I convert a Datastream into Table and back into
Datastream, watermark of the stream will be lost. As shown in the
program below, the TestOperator before the conversion will have its
processWatermark() method triggered and watermark value printed, but the
one after the conversion will not.
Is my observation correct? If so, is it the expected behavior of the
conversion API? My current work needs me to convert a table into
datastream and to do window operation on it, but this problem blocks me
from creating a window.
Regards,
Yunfeng
```java
public class SimpleTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Row> input = env.fromElements(Row.of(1));
input = input.transform(
"TestOperator",
new RowTypeInfo(new
TypeInformation[]{TypeInformation.of(Integer.class)}, new String[]{"f0"}),
new TestOperator("0")
);
input = tEnv.toDataStream(tEnv.fromDataStream(input));
input = input.transform(
"TestOperator",
new RowTypeInfo(new
TypeInformation[]{TypeInformation.of(Integer.class)}, new String[]{"f0"}),
new TestOperator("1")
);
System.out.println(IteratorUtils.toList(input.executeAndCollect()));
}
private static class TestOperator extends AbstractStreamOperator<Row>
implements OneInputStreamOperator<Row, Row>{
private final String prefix;
private TestOperator(String prefix) {
this.prefix = prefix;
}
@Override
public void processElement(StreamRecord<Row> streamRecord) throws
Exception {
System.out.println(prefix + streamRecord.getValue());
output.collect(streamRecord);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
System.out.println(prefix + mark.toString());
}
}
}
```