Hello Matthias, When I remove all the watermark strategies it does not process anything . For example when I use WatermarkStrategy.noWatermarks() instead of the one I build nothing seems to happen at all.
Also when I skip the part where I add wmStrategy to create tuple4dswm: DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy); Nothing is processed. Regards Hans-Peter Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias < matthias.schwa...@viseca.ch>: > Hi Hanspeter, > > > > Let me relate some hints that might help you getting concepts clearer. > > > > From your description I make following assumptions where your are not > specific enough (please confirm or correct in your answer): > > 1. You store incoming events in state per transaction_id to be > sorted/aggregated(min/max time) by event time later on > 2. So far you used a session window to determine the point in time > when to emit the stored/enriched/sorted events > 3. Watermarks are generated with bounded out of orderness > 4. You use session windows with a specific gap > 5. In your experiment you ever only send 1000 events and then stop > producing incoming events > > > > Now to your questions: > > - For processing time session windows, watermarks play no role > whatsoever, you simply assume that you’ve seen all events belonging so a > single transaction id if the last such event for a specific transaction id > was processed sessionWindowGap milliseconds ago > - Therefore you see all enriched incoming events the latest > sessionWindowGap ms after the last incoming event (+ some latency) > - In event time mode and resp event time session windows the situation > is exactly the same, only that processing time play no role > - A watermark means (ideally) that no event older than the watermark > time ever follows the watermark (which itself is a meta-event that flows > with the proper events on the same channels) > - In order for a session gap window to forward the enriched events the > window operator needs to receive a watermark that is sessionWindowGap > milliseconds beyond the latest incoming event (in terms of the respective > event time) > - The watermark generator in order to generate a new watermark that > triggers this last session window above needs to encounter an (any) event > that has a timestamp of (<latest event in session window> + outOfOrderness > + sessionWindowGap + 1ms) > - Remember, the watermark generator never generated watermarks based > on processing time, but only based on the timestamps it has seen in events > actually encountered > - Coming back to your idleness configuration: it only means that the > incoming stream becomes idle == timeless after a while … i.e. watermarks > won’t make progress from this steam, and it tells all downstream operators > - Idleness specification is only useful if a respective operator has > another source of valid watermarks (i.e. after a union of two streams, one > active/one idle ….). this is not your case > > > > I hope this clarifies your situation. > > > > Cheers > > > > > > Thias > > > > > > *From:* HG <hanspeter.sl...@gmail.com> > *Sent:* Mittwoch, 16. März 2022 10:06 > *To:* user <user@flink.apache.org> > *Subject:* Watermarks event time vs processing time > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi, > > > > I read from a Kafka topic events that are in JSON format > > These event contain a handling time (aka event time) in epoch > milliseconds, a transaction_id and a large nested JSON structure. > > I need to group the events by transaction_id, order them by handling time > and calculate the differences in handling time. > > The events are updated with this calculated elapsed time and pushed > further. > > So all events that go in should come out with the elapsed time added. > > > > For testing I use events that are old (so handling time is not nearly the > wall clock time) > > Initially I used EventTimeSessionWindows but somehow the processing did > not run as expected. > > When I pushed 1000 events eventually 800 or so would appear at the output. > This was resolved by switching to ProcessingTimeSessionWindows . > My thought was then that I could remove the watermarkstrategies with > watermarks with timestamp assigners (handling time) for the Kafka input > stream and the data stream. > > However this was not the case. > > > > Can anyone enlighten me as to why the watermark strategies are still > needed? > > > > Below the code > > > > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > .setProperties(kafkaProps) > .setProperty("ssl.truststore.type", trustStoreType) > .setProperty("ssl.truststore.password", trustStorePassword) > .setProperty("ssl.truststore.location", trustStoreLocation) > .setProperty("security.protocol", securityProtocol) > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > .setGroupId(inputGroupId) > .setClientIdPrefix(clientId) > .setTopics(kafkaInputTopic) > .setDeserializer(KafkaRecordDeserializationSchema.of(new > JSONKeyValueDeserializationSchema(fetchMetadata))) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build(); > > /* A watermark is needed to prevent duplicates! */ > WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy > > .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) > > .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) > .withTimestampAssigner(new > SerializableTimestampAssigner<ObjectNode>() { > @Override > public long extractTimestamp(ObjectNode element, long > eventTime) { > return > element.get("value").get("handling_time").asLong(); > } > }); > > /* Use the watermark stragegy to create a datastream */ > DataStream<ObjectNode> ds = env.fromSource(source, > kafkaWmstrategy, "Kafka Source"); > > /* Split the ObjectNode into a Tuple4 */ > DataStream<Tuple4<Long, Long, String, String>> tuple4ds = > ds.flatMap(new Splitter()); > > WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = > WatermarkStrategy > .<Tuple4<Long, Long, String, > String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) > > .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) > .withTimestampAssigner(new > SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() { > @Override > public long extractTimestamp(Tuple4<Long, Long, > String, String> element, long eventTime) { > return element.f0; > } > }); > > DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = > tuple4ds.assignTimestampsAndWatermarks(wmStrategy); > > DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm > .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, > String>() { > @Override > public String getKey(Tuple4<Long, Long, String, String> > value) throws Exception { > return value.f2; > } > }) > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap)))) > > .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness))) > .process(new MyProcessWindowFunction()); > > > KafkaSink<String> kSink = KafkaSink.<String>builder() > .setBootstrapServers(outputBrokers) > > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > .setTopic(kafkaOutputTopic) > .setValueSerializationSchema(new > SimpleStringSchema()) > .build() > ) > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build(); > > // Sink to the Kafka topic > tuple4DsWmKeyedbytr.sinkTo(kSink); > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >