Hi Jin,

If you are using new FLIP-27 sources like KafkaSource, per-partition watermark 
(or per-split watermark) is a default feature integrated in SourceOperator. You 
might hit the bug described in FLINK-26018 [1], which happens during the first 
fetch of the source that records in the first split pushes the watermark far 
away, then records from other splits will be treated as late events.  

[1] https://issues.apache.org/jira/browse/FLINK-26018

Best regards,

Qingsheng


> On Apr 8, 2022, at 15:54, Jin Yi <j...@promoted.ai> wrote:
> 
> how should the code look like to verify we're using per-partition watermarks 
> if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
> 
> we currently have it looking like:
> 
> streamExecutionEnvironment.fromSource(
>    KafkaSource.<T>builder().....build(),
>    watermarkStrategy,
>    "whatever",
>    typeInfo);
> 
> when running this job with the streamExecutionEnviornment parallelism set to 
> 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
> where the first operator after this source consumes events out of order (and 
> therefore, violates watermarks).  the operator simply checks to see what 
> "type" of event something is and uses side outputs to output the 
> type-specific messages.  here's a snippet of the event timestamp going back 
> before the current watermark (first instance of going backwards in time in 
> bold):
> 
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> 2022-04-08 05:47:06,317 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> 
> 
> 
> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <quietgol...@gmail.com> wrote:
> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you 
> for the help!
> 
> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <quietgol...@gmail.com> wrote:
> Thanks, Thias and Dongwon.
> 
> I'll keep debugging this with the idle watermark turned off.
> 
> Next TODOs:
> - Verify that we’re using per-partition watermarks.  Our code matches the 
> example but maybe something is disabling it.
> - Enable logging of partition-consumer assignment, to see if that is the 
> cause of the problem.
> - Look at adding flags to set the source parallelism to see if that fixes the 
> issue.
> 
> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
> Sounds like a good idea.
> 
> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
> I totally agree with Schwalbe that per-partition watermarking allows # source 
> tasks < # kafka partitions. 
> 
> Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
> said.
> 
> Best,
> 
> Dongwon
> 
> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
> <matthias.schwa...@viseca.ch> wrote:
> Hi San, Dongwon,
> 
>  
> 
> I share the opinion that when per-partition watermarking is enabled, you 
> should observe correct behavior … would be interesting to see why it does not 
> work for you.
> 
>  
> 
> I’d like to clear one tiny misconception here when you write:
> 
>  
> 
> >> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> You would expect to see glitches with watermarking when you enable idleness.
> 
> Idleness sort of trades watermark correctness for reduces latency when 
> processing timers (much simplified).
> 
> With idleness enabled you have no guaranties whatsoever as to the quality of 
> watermarks (which might be ok in some cases).
> 
> BTW we dominantly use a mix of fast and slow sources (that only update once a 
> day) which hand-pimped watermarking and late event processing, and enabling 
> idleness would break everything.
> 
>  
> 
> Oversight put aside things should work the way you implemented it.
> 
>  
> 
> One thing I could imagine to be a cause is
> 
>       • that over time the kafka partitions get reassigned  to different 
> consumer subtasks which would probably stress correct recalculation of 
> watermarks. Hence #partition == number subtask might reduce the problem
>       • can you enable logging of partition-consumer assignment, to see if 
> that is the cause of the problem
>       • also involuntary restarts of the job can cause havoc as this resets 
> watermarking
>  
> 
> I’ll be off next week, unable to take part in the active discussion …
> 
>  
> 
> Sincere greetings
> 
>  
> 
> Thias
> 
>  
> 
>  
> 
>  
> 
>  
> 
> From: Dan Hill <quietgol...@gmail.com> 
> Sent: Freitag, 18. März 2022 08:23
> To: Dongwon Kim <eastcirc...@gmail.com>
> Cc: user <user@flink.apache.org>
> Subject: Re: Weird Flink Kafka source watermark behavior
> 
>  
> 
> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> 
>  
> 
> I'll try forcing # source tasks = # partitions tomorrow.
> 
>  
> 
> Thank you, Dongwon, for all of your help!
> 
>  
> 
> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> I believe your job with per-partition watermarking should be working okay 
> even in a backfill scenario. 
> 
>  
> 
> BTW, is the problem still observed even with # sour tasks = # partitions?
> 
>  
> 
> For committers:
> 
> Is there a way to confirm that per-partition watermarking is used in TM log?
> 
>  
> 
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> wrote:
> 
> I hit this using event processing and no idleness detection.  The same issue 
> happens if I enable idleness.
> 
>  
> 
> My code matches the code example for per-partition watermarking.
> 
>  
> 
> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> Hi Dan,
> 
>  
> 
> I'm quite confused as you already use per-partition watermarking.
> 
>  
> 
> What I meant in the reply is
> 
> - If you don't use per-partition watermarking, # tasks < # partitions can 
> cause the problem for backfill jobs.
> 
> - If you don't use per-partition watermarking, # tasks = # partitions is 
> going to be okay even for backfill jobs.
> 
> - If you use per-partition watermarking, # tasks < # partitions shouldn't 
> cause any problems unless you turn on the idleness detection.
> 
>  
> 
> Regarding the idleness detection which is based on processing time, what is 
> your setting? If you set the value to 10 seconds for example, you'll face the 
> same problem unless the watermark of your backfill job catches up real-time 
> within 10 seconds. If you increase the value to 1 minute, your backfill job 
> should catch up real-time within 1 minute.
> 
>  
> 
> Best,
> 
>  
> 
> Dongwon
> 
>  
> 
>  
> 
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote:
> 
> Thanks Dongwon!
> 
>  
> 
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks 
> < # kafka partitions.  This should be called out in the docs or the bug 
> should be fixed.
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> Hi Dan,
> 
>  
> 
> Do you use the per-partition watermarking explained in [1]?
> 
> I've also experienced a similar problem when running backfill jobs 
> specifically when # source tasks < # kafka partitions. 
> 
> - When # source tasks = # kafka partitions, the backfill job works as 
> expected.
> 
> - When # source tasks < # kafka partitions, a Kafka consumer consumes 
> multiple partitions. This case can destroying the per-partition patterns as 
> explained in [2].
> 
>  
> 
> Hope this helps.
> 
>  
> 
> p.s. If you plan to use the per-partition watermarking, be aware that 
> idleness detection [3] can cause another problem when you run a backfill job. 
> Kafka source tasks in a backfill job seem to read a batch of records from 
> Kafka and then wait for downstream tasks to catch up the progress, which can 
> be counted as idleness.
> 
>  
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> 
> [2] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> 
> [3] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> 
>  
> 
> Best,
> 
>  
> 
> Dongwon
> 
>  
> 
> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:
> 
> I'm following the example from this section:
> 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:
> 
> Other points
> 
> - I'm using the kafka timestamp as event time.
> 
> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:
> 
> There are 12 Kafka partitions (to keep the structure similar to other low 
> traffic environments).
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:
> 
> Hi.
> 
>  
> 
> I'm running a backfill from a kafka topic with very few records spread across 
> a few days.  I'm seeing a case where the records coming from a kafka source 
> have a watermark that's more recent (by hours) than the event time.  I 
> haven't seen this before when running this.  This violates what I'd assume 
> the kafka source would do.
> 
>  
> 
> Example problem:
> 
> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are 
> separated by a longer time period.
> 
> 2.  My first operator after the FlinkKafkaConsumer sees:
> 
>    context.timestamp() = 1000
> 
>    context.timerService().currentWatermark() = 500000
> 
>  
> 
> Details about how I'm running this:
> 
> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> 
> - I'm using FlinkKafkaConsumer
> 
> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness 
> settings.
> 
> - I'm running similar code in all the environments.  The main difference is 
> low traffic.  I have not been able to reproduce this out of the environment.
> 
>  
> 
>  
> 
> I put the following process function right after my kafka source.
> 
>  
> 
> --------
> 
> 
> AfterSource
> 
> ts=1647274892728
> watermark=1647575140007
> record=...
> 
>  
> 
> 
> public static class TextLog extends ProcessFunction<Record, Record> {
>     private final String label;
>     public TextLogDeliveryLog(String label) {
>         this.label = label;
>     }
>     @Override
>     public void processElement(Record record, Context context, 
> Collector<Record> collector) throws Exception {
>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>                 label, context.timestamp(), 
> context.timerService().currentWatermark(), record);
>         collector.collect(deliveryLog);
>     }
> }
> 
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit 
> von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
> Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
> Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung 
> per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. 
> Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist 
> streng verboten.
> 
> This message is intended only for the named recipient and may contain 
> confidential or privileged information. As the confidentiality of email 
> communication cannot be guaranteed, we do not accept any responsibility for 
> the confidentiality and the intactness of this message. If you have received 
> it in error, please advise the sender by return e-mail and delete this 
> message and any attachments. Any unauthorised use or dissemination of this 
> information is strictly prohibited.

Reply via email to