Hi Yunfeng,

by default the fromDataStream does not propagate watermarks into Table API. Because Table API needs a time attribute in the schema that corresponds to the watermarking. A time attribute will also put back into the stream record during toDataStream.

Please take a look at:

https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream

Esp. example 4 should solve your use case:

// === EXAMPLE 4 ===

// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API

// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
            .watermark("rowtime", "SOURCE_WATERMARK()")
            .build());

I hope this helps.

Regards,
Timo


On 04.11.21 12:00, Yunfeng Zhou wrote:
Hi,

I found that if I convert a Datastream into Table and back into Datastream, watermark of the stream will be lost. As shown in the program below, the TestOperator before the conversion will have its processWatermark() method triggered and watermark value printed, but the one after the conversion will not.

Is my observation correct? If so, is it the expected behavior of the conversion API? My current work needs me to convert a table into datastream and to do window operation on it, but this problem blocks me from creating a window.

Regards,
Yunfeng

```java
public class SimpleTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream<Row> input = env.fromElements(Row.of(1));

input = input.transform(
"TestOperator",
new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class)}, new String[]{"f0"}),
new TestOperator("0")
);

input = tEnv.toDataStream(tEnv.fromDataStream(input));

input = input.transform(
"TestOperator",
new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class)}, new String[]{"f0"}),
new TestOperator("1")
);

System.out.println(IteratorUtils.toList(input.executeAndCollect()));
}

private static class TestOperator extends AbstractStreamOperator<Row>
implements OneInputStreamOperator<Row, Row>{
private final String prefix;

private TestOperator(String prefix) {
this.prefix = prefix;
}

@Override
public void processElement(StreamRecord<Row> streamRecord) throws Exception {
System.out.println(prefix + streamRecord.getValue());
output.collect(streamRecord);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
System.out.println(prefix + mark.toString());
}
}
}
```

Reply via email to