Forget what I said before, I just tested the behavior and it seems there
is a bug in the conversion logic.
I opened https://issues.apache.org/jira/browse/FLINK-21013
Thanks for reaching out to us.
Regards,
Timo
On 18.01.21 15:37, Timo Walther wrote:
Hi,
in SQL event time is not part of the StreamRecord but a column in the
table. Thus, you need to extract it and specify the column name/location
when converting to Table API:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
When converting back to DataStream API, a single time column will be
converted back to StreamRecord if the SQL query preserves event-time.
For example, this is not the case for `toRetractStream`. Usually
toAppendStream and event-time play nicely together.
Regards,
Timo
On 18.01.21 13:48, DongyangYao wrote:
Hey guys,
As far as I know, the timestamp field of StreamRecord instance is the
event time assgined by assignTimestampsAndWatermarks method if I have
set the time characteristic of job to event time. My confusion is that
the timestamp does not transfer through different operators as I expect.
E.g., Map operator implemented by StreamMap class:
@Override public void processElement(StreamRecord<IN> element) throws
Exception {
output.collect(element.replace(userFunction.map(element.getValue()))); }
Flat Map operator by StreamFlatMap:
@Override public void processElement(StreamRecord<IN> element) throws
Exception { collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector); }
Probably, Agg operator by StreamGroupedReduce:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();
if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
values.update(value);
output.collect(element.replace(value));
}
}
Also, window operator by WindowOperator:
private void emitWindowContents(W window, ACC contents) throws
Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
userFunction.process(triggerContext.key, window, processContext,
contents, timestampedCollector); }
All the operators above will deliver the timestamp to new StreamRecord
instance. Then I just write a very simple SQL query, i.e., select a +
10, b, c from tb, however, when I get the result stream by
toAppendStream or toRetractStream method, I find the timestamp of
StreamRecord is null which is printed by Context.timestamp() in a
ProcessFunction.
Best regard,
Dongyang Yao