Forget what I said before, I just tested the behavior and it seems there is a bug in the conversion logic.

I opened https://issues.apache.org/jira/browse/FLINK-21013

Thanks for reaching out to us.

Regards,
Timo

On 18.01.21 15:37, Timo Walther wrote:
Hi,

in SQL event time is not part of the StreamRecord but a column in the table. Thus, you need to extract it and specify the column name/location when converting to Table API:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion

When converting back to DataStream API, a single time column will be converted back to StreamRecord if the SQL query preserves event-time. For example, this is not the case for `toRetractStream`. Usually toAppendStream and event-time play nicely together.

Regards,
Timo


On 18.01.21 13:48, DongyangYao wrote:
Hey guys,
As far as I know, the timestamp field of StreamRecord instance is the event time assgined by assignTimestampsAndWatermarks method if I have set the time characteristic of job to event time. My confusion is that the timestamp does not transfer through different operators as I expect.
E.g., Map operator implemented by StreamMap class:
@Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }
Flat Map operator by StreamFlatMap:
@Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); }
Probably, Agg operator by StreamGroupedReduce:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();

if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
values.update(value);
output.collect(element.replace(value));
}
}
Also, window operator by WindowOperator:
private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); } All the operators above will deliver the timestamp to new StreamRecord instance. Then I just write a very simple SQL query, i.e., select a + 10, b, c from tb, however, when I get the result stream by toAppendStream or toRetractStream method, I find the timestamp of StreamRecord is null which is printed by Context.timestamp() in a ProcessFunction.

Best regard,
Dongyang Yao




Reply via email to