Hi Yunfeng,

it seems this is a deeper issue with the fromValues implementation. Under the hood, it still uses the deprecated InputFormat stack. And as far as I can see, there we don't emit a final MAX_WATERMARK. I will definitely forward this.

But toDataStream forwards watermarks correctly.

I hope this helps. Or do you think we should also rediscuss the fromDataStream watermark behavior?

Regards,
Timo


On 06.12.21 10:26, Yunfeng Zhou wrote:
Hi Timo,

Thanks for your response. I encountered another problem that might be relevant to the watermark as we discussed above.

In the test cases shown below. I would create a table from some data, convert it to datastream and do windowAll().reduce() on it. If we need to explicitly specify a `rowtime` metadata column in order to make the table pass timestamps to the converted datastream, then both the test cases should print out empty lists. In fact, one of them could print out a list with some data. The only difference between them is that I changed the value of some input data. This behavior can be reproduced under Flink ML's latest java environment and configurations.

Is this the expected behavior of `toDataStream`, or I have accidentally encountered a bug?

Best regards,
Yunfeng

```java

public class SimpleTest {
     @Test
public void testSimple1()throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table inputTable = tEnv.fromValues(
                 DataTypes.ROW(
                         DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())
                 ),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "b", 0., 1., 1., 3, "l1"),
Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
Row.of(1., "a", 1., 1., 0., 1, "l0"),
Row.of(2., "d", 1., 1., 0., 1, "l0")
         );

DataStream<Row> input = tEnv.toDataStream(inputTable);

System.out.println(IteratorUtils.toList(input
                 .windowAll(EndOfStreamWindows.get())
                 .reduce((ReduceFunction<Row>) (row, t1) -> row)
                 .executeAndCollect()
         ));
}


     @Test
public void testSimple2()throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table inputTable = tEnv.fromValues(
                 DataTypes.ROW(
                         DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())
                 ),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 0., 1., 2, "l1"),
Row.of(1., "b", 0., 1., 1., 3, "l1"),
Row.of(1., "d", 0., 1., 1.5, 2, "l1"),
Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"),
Row.of(1., "a", 1., 1.5, 0., 1, "l0"),
Row.of(2., "d", 1., 1., 0., 1, "l0")
         );

DataStream<Row> input = tEnv.toDataStream(inputTable);

System.out.println(IteratorUtils.toList(input
                 .windowAll(EndOfStreamWindows.get())
                 .reduce((ReduceFunction<Row>) (row, t1) -> row)
                 .executeAndCollect()
         ));
}
}

```

```java

/**
* A {@link WindowAssigner} that assigns all elements of a bounded input stream into one window
* pane. The results are emitted once the input stream has ended.
*/
public class EndOfStreamWindowsextends WindowAssigner<Object, TimeWindow> {

     private static final EndOfStreamWindowsINSTANCE =new EndOfStreamWindows();

private EndOfStreamWindows() {}

     public static EndOfStreamWindowsget() {
         return INSTANCE;
}

     @Override
public Collection<TimeWindow>assignWindows(
             Object element, long timestamp, WindowAssignerContext context) {
         return Collections.singletonList(new TimeWindow(Long.MIN_VALUE, 
Long.MAX_VALUE));
}

     @Override
public Trigger<Object, TimeWindow>getDefaultTrigger(StreamExecutionEnvironment 
env) {
         return EventTimeTrigger.create();
}

     @Override
public StringtoString() {
         return "EndOfStreamWindows()";
}

     @Override
public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig 
executionConfig) {
         return new TimeWindow.Serializer();
}

     @Override
public boolean isEventTime() {
         return true;
}
}

```

On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Yunfeng,

    by default the fromDataStream does not propagate watermarks into Table
    API. Because Table API needs a time attribute in the schema that
    corresponds to the watermarking. A time attribute will also put back
    into the stream record during toDataStream.

    Please take a look at:

    
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream
    
<https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream>

    Esp. example 4 should solve your use case:

    // === EXAMPLE 4 ===

    // derive all physical columns automatically
    // but access the stream record's timestamp for creating a rowtime
    attribute column
    // also rely on the watermarks generated in the DataStream API

    // we assume that a watermark strategy has been defined for
    `dataStream`
    before
    // (not part of this example)
    Table table =
          tableEnv.fromDataStream(
              dataStream,
              Schema.newBuilder()
                  .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
                  .watermark("rowtime", "SOURCE_WATERMARK()")
                  .build());

    I hope this helps.

    Regards,
    Timo


    On 04.11.21 12:00, Yunfeng Zhou wrote:
     > Hi,
     >
     > I found that if I convert a Datastream into Table and back into
     > Datastream, watermark of the stream will be lost. As shown in the
     > program below, the TestOperator before the conversion will have its
     > processWatermark() method triggered and watermark value printed,
    but the
     > one after the conversion will not.
     >
     > Is my observation correct? If so, is it the expected behavior of the
     > conversion API? My current work needs me to convert a table into
     > datastream and to do window operation on it, but this problem
    blocks me
     > from creating a window.
     >
     > Regards,
     > Yunfeng
     >
     > ```java
     > public class SimpleTest {
     > public static void main(String[] args) throws Exception {
     > StreamExecutionEnvironment env =
     > StreamExecutionEnvironment.createLocalEnvironment();
     > env.setParallelism(1);
     > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
     >
     > DataStream<Row> input = env.fromElements(Row.of(1));
     >
     > input = input.transform(
     > "TestOperator",
     > new RowTypeInfo(new
     > TypeInformation[]{TypeInformation.of(Integer.class)}, new
    String[]{"f0"}),
     > new TestOperator("0")
     > );
     >
     > input = tEnv.toDataStream(tEnv.fromDataStream(input));
     >
     > input = input.transform(
     > "TestOperator",
     > new RowTypeInfo(new
     > TypeInformation[]{TypeInformation.of(Integer.class)}, new
    String[]{"f0"}),
     > new TestOperator("1")
     > );
     >
     > System.out.println(IteratorUtils.toList(input.executeAndCollect()));
     > }
     >
     > private static class TestOperator extends AbstractStreamOperator<Row>
     > implements OneInputStreamOperator<Row, Row>{
     > private final String prefix;
     >
     > private TestOperator(String prefix) {
     > this.prefix = prefix;
     > }
     >
     > @Override
     > public void processElement(StreamRecord<Row> streamRecord) throws
     > Exception {
     > System.out.println(prefix + streamRecord.getValue());
     > output.collect(streamRecord);
     > }
     >
     > @Override
     > public void processWatermark(Watermark mark) throws Exception {
     > super.processWatermark(mark);
     > System.out.println(prefix + mark.toString());
     > }
     > }
     > }
     > ```


Reply via email to