Hi Dom, The output elements from ProcessingTime timer in BroadcastProcessFunction or KeyedCoProcessFunction will be erased timestamp. So you have to assign a new `*assignTimestampsAndWatermarks` *after that, or use EventTime timer.
Best, Jark On Thu, 19 Mar 2020 at 16:40, Dominik Wosiński <wos...@gmail.com> wrote: > Yes, I understand this completely, but my question is a little bit > different. > > The issue is that if I have something like : > *val firstStream = dataStreamFromKafka* > *.assignTimestampAndWatermarks(...)* > *val secondStream = otherStreamFromKafka* > *.assignTimestampsAndWatermarks(...)* > *.broadcast(...)* > > So, now If I do something like: > *firstStream.keyby(...).connect(secondStream)* > *.process(someBroadcastProcessFunction)* > > Now, I only select one field from the second stream and this is *not the > timestamp field *and from the first stream I select all fields *including > timestamp *(in process function when creating a new record). > > Then everything works like a charm and no issues there. But If I register > ProcessingTime timer in this *someBroadcastProcessFunction *and any > element is produced from *onTimer* function, then I get the issue > described above. > > Best Regards, > Dom. > > czw., 19 mar 2020 o 02:41 Jark Wu <imj...@gmail.com> napisał(a): > >> Hi Dom, >> >> If you are converting a DataStream to a Table with a rowtime attribute, >> then the DataStream should hold event-time timestamp. >> For example, call `assignTimestampsAndWatermarks` before converting to >> table. You can find more details in the doc [1]. >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1 >> >> On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński <wos...@gmail.com> wrote: >> >>> Hey, >>> I just wanted to ask about one thing about timestamps. So, currently If >>> I have a KeyedBroadcastProcess function followed by Temporal Table Join, it >>> works like a charm. But, say I want to delay emitting some of the results >>> due to any reason. So If I *registerProcessingTimeTimer* and any >>> elements are emitted in *onTimer* call then the timestamps are erased, >>> meaning that I will simply get : >>> *Caused by: java.lang.RuntimeException: Rowtime timestamp is null. >>> Please make sure that a proper TimestampAssigner is defined and the stream >>> environment uses the EventTime time characteristic.* >>> * at DataStreamSourceConversion$10.processElement(Unknown Source)* >>> * at >>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)* >>> * at >>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)* >>> * at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)* >>> * ... 23 more* >>> >>> Is that the expected behavior? I haven't seen it described anywhere >>> before and I wasn't able to find any docs specifying this. >>> >>> Thanks in advance, >>> Best Regards, >>> Dom. >>> >>