Hey guys,
As far as I know, the timestamp field of StreamRecord instance is the event
time assgined by assignTimestampsAndWatermarks method if I have set the time
characteristic of job to event time. My confusion is that the timestamp does
not transfer through different operators as I expect.
E.g., Map operator implemented by StreamMap class:
@Override public void processElement(StreamRecord<IN> element) throws Exception
{ output.collect(element.replace(userFunction.map(element.getValue()))); }
Flat Map operator by StreamFlatMap:
@Override public void processElement(StreamRecord<IN> element) throws Exception
{ collector.setTimestamp(element); userFunction.flatMap(element.getValue(),
collector); }
Probably, Agg operator by StreamGroupedReduce:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();
if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
values.update(value);
output.collect(element.replace(value));
}
}
Also, window operator by WindowOperator:
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window; userFunction.process(triggerContext.key,
window, processContext, contents, timestampedCollector); }
All the operators above will deliver the timestamp to new StreamRecord
instance. Then I just write a very simple SQL query, i.e., select a + 10, b, c
from tb, however, when I get the result stream by toAppendStream or
toRetractStream method, I find the timestamp of StreamRecord is null which is
printed by Context.timestamp() in a ProcessFunction.
Best regard,
Dongyang Yao