Hi Timo,
Thanks for your response. I encountered another problem that might be
relevant to the watermark as we discussed above.
In the test cases shown below. I would create a table from some data,
convert it to datastream and do windowAll().reduce() on it. If we need
to explicitly specify a `rowtime` metadata column in order to make the
table pass timestamps to the converted datastream, then both the test
cases should print out empty lists. In fact, one of them could print out
a list with some data. The only difference between them is that I
changed the value of some input data. This behavior can be reproduced
under Flink ML's latest java environment and configurations.
Is this the expected behavior of `toDataStream`, or I have accidentally
encountered a bug?
Best regards,
Yunfeng
```java
public class SimpleTest {
@Test
public void testSimple1()throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table inputTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())
),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "b", 0., 1., 1., 3, "l1"),
Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
Row.of(1., "a", 1., 1., 0., 1, "l0"),
Row.of(2., "d", 1., 1., 0., 1, "l0")
);
DataStream<Row> input = tEnv.toDataStream(inputTable);
System.out.println(IteratorUtils.toList(input
.windowAll(EndOfStreamWindows.get())
.reduce((ReduceFunction<Row>) (row, t1) -> row)
.executeAndCollect()
));
}
@Test
public void testSimple2()throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table inputTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())
),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 0., 1., 2, "l1"),
Row.of(1., "b", 0., 1., 1., 3, "l1"),
Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
Row.of(2., "d", 1., 1., 0., 1, "l0")
);
DataStream<Row> input = tEnv.toDataStream(inputTable);
System.out.println(IteratorUtils.toList(input
.windowAll(EndOfStreamWindows.get())
.reduce((ReduceFunction<Row>) (row, t1) -> row)
.executeAndCollect()
));
}
}
```
```java
/**
* A {@link WindowAssigner} that assigns all elements of a bounded input
stream into one window
* pane. The results are emitted once the input stream has ended.
*/
public class EndOfStreamWindowsextends WindowAssigner<Object, TimeWindow> {
private static final EndOfStreamWindowsINSTANCE =new EndOfStreamWindows();
private EndOfStreamWindows() {}
public static EndOfStreamWindowsget() {
return INSTANCE;
}
@Override
public Collection<TimeWindow>assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(Long.MIN_VALUE,
Long.MAX_VALUE));
}
@Override
public Trigger<Object, TimeWindow>getDefaultTrigger(StreamExecutionEnvironment
env) {
return EventTimeTrigger.create();
}
@Override
public StringtoString() {
return "EndOfStreamWindows()";
}
@Override
public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig
executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
```
On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Yunfeng,
by default the fromDataStream does not propagate watermarks into Table
API. Because Table API needs a time attribute in the schema that
corresponds to the watermarking. A time attribute will also put back
into the stream record during toDataStream.
Please take a look at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
<https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream>
Esp. example 4 should solve your use case:
// === EXAMPLE 4 ===
// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime
attribute column
// also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for
`dataStream`
before
// (not part of this example)
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
I hope this helps.
Regards,
Timo
On 04.11.21 12:00, Yunfeng Zhou wrote:
> Hi,
>
> I found that if I convert a Datastream into Table and back into
> Datastream, watermark of the stream will be lost. As shown in the
> program below, the TestOperator before the conversion will have its
> processWatermark() method triggered and watermark value printed,
but the
> one after the conversion will not.
>
> Is my observation correct? If so, is it the expected behavior of the
> conversion API? My current work needs me to convert a table into
> datastream and to do window operation on it, but this problem
blocks me
> from creating a window.
>
> Regards,
> Yunfeng
>
> ```java
> public class SimpleTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> DataStream<Row> input = env.fromElements(Row.of(1));
>
> input = input.transform(
> "TestOperator",
> new RowTypeInfo(new
> TypeInformation[]{TypeInformation.of(Integer.class)}, new
String[]{"f0"}),
> new TestOperator("0")
> );
>
> input = tEnv.toDataStream(tEnv.fromDataStream(input));
>
> input = input.transform(
> "TestOperator",
> new RowTypeInfo(new
> TypeInformation[]{TypeInformation.of(Integer.class)}, new
String[]{"f0"}),
> new TestOperator("1")
> );
>
> System.out.println(IteratorUtils.toList(input.executeAndCollect()));
> }
>
> private static class TestOperator extends AbstractStreamOperator<Row>
> implements OneInputStreamOperator<Row, Row>{
> private final String prefix;
>
> private TestOperator(String prefix) {
> this.prefix = prefix;
> }
>
> @Override
> public void processElement(StreamRecord<Row> streamRecord) throws
> Exception {
> System.out.println(prefix + streamRecord.getValue());
> output.collect(streamRecord);
> }
>
> @Override
> public void processWatermark(Watermark mark) throws Exception {
> super.processWatermark(mark);
> System.out.println(prefix + mark.toString());
> }
> }
> }
> ```