Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one
I build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
>    1. You store incoming events in state per transaction_id to be
>    sorted/aggregated(min/max time) by event time later on
>    2. So far you used a session window to determine the point in time
>    when to emit the stored/enriched/sorted events
>    3. Watermarks are generated with bounded out of orderness
>    4. You use session windows with a specific gap
>    5. In your experiment you ever only send 1000 events and then stop
>    producing incoming events
>
>
>
> Now to your questions:
>
>    - For processing time session windows, watermarks play no role
>    whatsoever, you simply assume that you’ve seen all events belonging so a
>    single transaction id if the last such event for a specific transaction id
>    was processed sessionWindowGap milliseconds ago
>    - Therefore you see all enriched incoming events the latest
>    sessionWindowGap ms after the last incoming event (+ some latency)
>    - In event time mode and resp event time session windows the situation
>    is exactly the same, only that processing time play no role
>    - A watermark means (ideally) that no event older than the watermark
>    time ever follows the watermark (which itself is a meta-event that flows
>    with the proper events on the same channels)
>    - In order for a session gap window to forward the enriched events the
>    window operator needs to receive a watermark that is sessionWindowGap
>    milliseconds beyond the latest incoming event (in terms of the respective
>    event time)
>    - The watermark generator in order to generate a new watermark that
>    triggers this last session window above needs to encounter an (any) event
>    that has a timestamp of (<latest event in session window> + outOfOrderness
>    + sessionWindowGap + 1ms)
>    - Remember, the watermark generator never generated watermarks based
>    on processing time, but only based on the timestamps it has seen in events
>    actually encountered
>    - Coming back to your idleness configuration: it only means that the
>    incoming stream becomes idle == timeless after a while … i.e. watermarks
>    won’t make progress from this steam, and it tells all downstream operators
>    - Idleness specification is only useful if a respective operator has
>    another source of valid watermarks (i.e. after a union of two streams, one
>    active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <hanspeter.sl...@gmail.com>
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user <user@flink.apache.org>
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
>         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>                 .setProperties(kafkaProps)
>                 .setProperty("ssl.truststore.type", trustStoreType)
>                 .setProperty("ssl.truststore.password", trustStorePassword)
>                 .setProperty("ssl.truststore.location", trustStoreLocation)
>                 .setProperty("security.protocol", securityProtocol)
>                 .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
>                 .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
>                 .setGroupId(inputGroupId)
>                 .setClientIdPrefix(clientId)
>                 .setTopics(kafkaInputTopic)
>                 .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>                 .build();
>
>         /* A watermark is needed to prevent duplicates! */
>         WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
>
> .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<ObjectNode>() {
>                     @Override
>                     public long extractTimestamp(ObjectNode element, long
> eventTime) {
>                         return
> element.get("value").get("handling_time").asLong();
>                     }
>                 });
>
>         /* Use the watermark stragegy to create a datastream */
>         DataStream<ObjectNode> ds = env.fromSource(source,
> kafkaWmstrategy, "Kafka Source");
>
>         /* Split the ObjectNode into a Tuple4 */
>         DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
> ds.flatMap(new Splitter());
>
>         WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
> WatermarkStrategy
>                 .<Tuple4<Long, Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
>                     @Override
>                     public long extractTimestamp(Tuple4<Long, Long,
> String, String> element, long eventTime) {
>                         return element.f0;
>                     }
>                 });
>
>         DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>         DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
>             .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
> String>() {
>                 @Override
>                 public String getKey(Tuple4<Long, Long, String, String>
> value) throws Exception {
>                     return value.f2;
>                 }
>             })
>
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
>
> .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
>             .process(new MyProcessWindowFunction());
>
>
>         KafkaSink<String> kSink = KafkaSink.<String>builder()
>                 .setBootstrapServers(outputBrokers)
>
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                         .setTopic(kafkaOutputTopic)
>                         .setValueSerializationSchema(new
> SimpleStringSchema())
>                         .build()
>                 )
>                 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>                 .build();
>
>         // Sink to the Kafka topic
>         tuple4DsWmKeyedbytr.sinkTo(kSink);
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to