Hi Jin, 

Unfortunately I don’t have any quick bypass in mind except increasing the 
tolerance of out of orderness. 

Best regards, 

Qingsheng

> On Apr 8, 2022, at 18:12, Jin Yi <j...@promoted.ai> wrote:
> 
> confirmed that moving back to FlinkKafkaConsumer fixes things.
> 
> is there some notification channel/medium that highlights critical 
> bugs/issues on the intended features like this pretty readily?
> 
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <j...@promoted.ai> wrote:
> based on symptoms/observations on the first operator (LogRequestFilter) 
> watermark and event timestamps, it does seem like it's the bug.  things track 
> fine (timestamp > watermark) for the first batch of events, then the event 
> timestamps go back into the past and are "late".
> 
> looks like the 1.14 backport just got in 11 days ago 
> (https://github.com/apache/flink/pull/19128).  is there a way to easily test 
> this fix locally?  based on the threads, should i just move back to 
> FlinkKafkaConsumer until 1.14.5?
> 
> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <renqs...@gmail.com> wrote:
> Hi Jin,
> 
> If you are using new FLIP-27 sources like KafkaSource, per-partition 
> watermark (or per-split watermark) is a default feature integrated in 
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which 
> happens during the first fetch of the source that records in the first split 
> pushes the watermark far away, then records from other splits will be treated 
> as late events.  
> 
> [1] https://issues.apache.org/jira/browse/FLINK-26018
> 
> Best regards,
> 
> Qingsheng
> 
> 
> > On Apr 8, 2022, at 15:54, Jin Yi <j...@promoted.ai> wrote:
> > 
> > how should the code look like to verify we're using per-partition 
> > watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in 
> > 1.14.4?
> > 
> > we currently have it looking like:
> > 
> > streamExecutionEnvironment.fromSource(
> >    KafkaSource.<T>builder().....build(),
> >    watermarkStrategy,
> >    "whatever",
> >    typeInfo);
> > 
> > when running this job with the streamExecutionEnviornment parallelism set 
> > to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
> > where the first operator after this source consumes events out of order 
> > (and therefore, violates watermarks).  the operator simply checks to see 
> > what "type" of event something is and uses side outputs to output the 
> > type-specific messages.  here's a snippet of the event timestamp going back 
> > before the current watermark (first instance of going backwards in time in 
> > bold):
> > 
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> > 2022-04-08 05:47:06,317 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> > 
> > 
> > 
> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <quietgol...@gmail.com> wrote:
> > I dove deeper.  I wasn't actually using per-partition watermarks.  Thank 
> > you for the help!
> > 
> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <quietgol...@gmail.com> wrote:
> > Thanks, Thias and Dongwon.
> > 
> > I'll keep debugging this with the idle watermark turned off.
> > 
> > Next TODOs:
> > - Verify that we’re using per-partition watermarks.  Our code matches the 
> > example but maybe something is disabling it.
> > - Enable logging of partition-consumer assignment, to see if that is the 
> > cause of the problem.
> > - Look at adding flags to set the source parallelism to see if that fixes 
> > the issue.
> > 
> > Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
> > Sounds like a good idea.
> > 
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows # 
> > source tasks < # kafka partitions. 
> > 
> > Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
> > said.
> > 
> > Best,
> > 
> > Dongwon
> > 
> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
> > <matthias.schwa...@viseca.ch> wrote:
> > Hi San, Dongwon,
> > 
> >  
> > 
> > I share the opinion that when per-partition watermarking is enabled, you 
> > should observe correct behavior … would be interesting to see why it does 
> > not work for you.
> > 
> >  
> > 
> > I’d like to clear one tiny misconception here when you write:
> > 
> >  
> > 
> > >> - The same issue happens even if I use an idle watermark.
> > 
> >  
> > 
> > You would expect to see glitches with watermarking when you enable idleness.
> > 
> > Idleness sort of trades watermark correctness for reduces latency when 
> > processing timers (much simplified).
> > 
> > With idleness enabled you have no guaranties whatsoever as to the quality 
> > of watermarks (which might be ok in some cases).
> > 
> > BTW we dominantly use a mix of fast and slow sources (that only update once 
> > a day) which hand-pimped watermarking and late event processing, and 
> > enabling idleness would break everything.
> > 
> >  
> > 
> > Oversight put aside things should work the way you implemented it.
> > 
> >  
> > 
> > One thing I could imagine to be a cause is
> > 
> >       • that over time the kafka partitions get reassigned  to different 
> > consumer subtasks which would probably stress correct recalculation of 
> > watermarks. Hence #partition == number subtask might reduce the problem
> >       • can you enable logging of partition-consumer assignment, to see if 
> > that is the cause of the problem
> >       • also involuntary restarts of the job can cause havoc as this resets 
> > watermarking
> >  
> > 
> > I’ll be off next week, unable to take part in the active discussion …
> > 
> >  
> > 
> > Sincere greetings
> > 
> >  
> > 
> > Thias
> > 
> >  
> > 
> >  
> > 
> >  
> > 
> >  
> > 
> > From: Dan Hill <quietgol...@gmail.com> 
> > Sent: Freitag, 18. März 2022 08:23
> > To: Dongwon Kim <eastcirc...@gmail.com>
> > Cc: user <user@flink.apache.org>
> > Subject: Re: Weird Flink Kafka source watermark behavior
> > 
> >  
> > 
> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> > 
> >  
> > 
> > I'll try forcing # source tasks = # partitions tomorrow.
> > 
> >  
> > 
> > Thank you, Dongwon, for all of your help!
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
> > 
> > I believe your job with per-partition watermarking should be working okay 
> > even in a backfill scenario. 
> > 
> >  
> > 
> > BTW, is the problem still observed even with # sour tasks = # partitions?
> > 
> >  
> > 
> > For committers:
> > 
> > Is there a way to confirm that per-partition watermarking is used in TM log?
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> wrote:
> > 
> > I hit this using event processing and no idleness detection.  The same 
> > issue happens if I enable idleness.
> > 
> >  
> > 
> > My code matches the code example for per-partition watermarking.
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
> > 
> > Hi Dan,
> > 
> >  
> > 
> > I'm quite confused as you already use per-partition watermarking.
> > 
> >  
> > 
> > What I meant in the reply is
> > 
> > - If you don't use per-partition watermarking, # tasks < # partitions can 
> > cause the problem for backfill jobs.
> > 
> > - If you don't use per-partition watermarking, # tasks = # partitions is 
> > going to be okay even for backfill jobs.
> > 
> > - If you use per-partition watermarking, # tasks < # partitions shouldn't 
> > cause any problems unless you turn on the idleness detection.
> > 
> >  
> > 
> > Regarding the idleness detection which is based on processing time, what is 
> > your setting? If you set the value to 10 seconds for example, you'll face 
> > the same problem unless the watermark of your backfill job catches up 
> > real-time within 10 seconds. If you increase the value to 1 minute, your 
> > backfill job should catch up real-time within 1 minute.
> > 
> >  
> > 
> > Best,
> > 
> >  
> > 
> > Dongwon
> > 
> >  
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote:
> > 
> > Thanks Dongwon!
> > 
> >  
> > 
> > Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source 
> > tasks < # kafka partitions.  This should be called out in the docs or the 
> > bug should be fixed.
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
> > 
> > Hi Dan,
> > 
> >  
> > 
> > Do you use the per-partition watermarking explained in [1]?
> > 
> > I've also experienced a similar problem when running backfill jobs 
> > specifically when # source tasks < # kafka partitions. 
> > 
> > - When # source tasks = # kafka partitions, the backfill job works as 
> > expected.
> > 
> > - When # source tasks < # kafka partitions, a Kafka consumer consumes 
> > multiple partitions. This case can destroying the per-partition patterns as 
> > explained in [2].
> > 
> >  
> > 
> > Hope this helps.
> > 
> >  
> > 
> > p.s. If you plan to use the per-partition watermarking, be aware that 
> > idleness detection [3] can cause another problem when you run a backfill 
> > job. Kafka source tasks in a backfill job seem to read a batch of records 
> > from Kafka and then wait for downstream tasks to catch up the progress, 
> > which can be counted as idleness.
> > 
> >  
> > 
> > [1] 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> > 
> > [2] 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> > 
> > [3] 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> > 
> >  
> > 
> > Best,
> > 
> >  
> > 
> > Dongwon
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:
> > 
> > I'm following the example from this section:
> > 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:
> > 
> > Other points
> > 
> > - I'm using the kafka timestamp as event time.
> > 
> > - The same issue happens even if I use an idle watermark.
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:
> > 
> > There are 12 Kafka partitions (to keep the structure similar to other low 
> > traffic environments).
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:
> > 
> > Hi.
> > 
> >  
> > 
> > I'm running a backfill from a kafka topic with very few records spread 
> > across a few days.  I'm seeing a case where the records coming from a kafka 
> > source have a watermark that's more recent (by hours) than the event time.  
> > I haven't seen this before when running this.  This violates what I'd 
> > assume the kafka source would do.
> > 
> >  
> > 
> > Example problem:
> > 
> > 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are 
> > separated by a longer time period.
> > 
> > 2.  My first operator after the FlinkKafkaConsumer sees:
> > 
> >    context.timestamp() = 1000
> > 
> >    context.timerService().currentWatermark() = 500000
> > 
> >  
> > 
> > Details about how I'm running this:
> > 
> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> > 
> > - I'm using FlinkKafkaConsumer
> > 
> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness 
> > settings.
> > 
> > - I'm running similar code in all the environments.  The main difference is 
> > low traffic.  I have not been able to reproduce this out of the environment.
> > 
> >  
> > 
> >  
> > 
> > I put the following process function right after my kafka source.
> > 
> >  
> > 
> > --------
> > 
> > 
> > AfterSource
> > 
> > ts=1647274892728
> > watermark=1647575140007
> > record=...
> > 
> >  
> > 
> > 
> > public static class TextLog extends ProcessFunction<Record, Record> {
> >     private final String label;
> >     public TextLogDeliveryLog(String label) {
> >         this.label = label;
> >     }
> >     @Override
> >     public void processElement(Record record, Context context, 
> > Collector<Record> collector) throws Exception {
> >         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> >                 label, context.timestamp(), 
> > context.timerService().currentWatermark(), record);
> >         collector.collect(deliveryLog);
> >     }
> > }
> > 
> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die 
> > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, 
> > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und 
> > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir 
> > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie 
> > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung 
> > dieser Informationen ist streng verboten.
> > 
> > This message is intended only for the named recipient and may contain 
> > confidential or privileged information. As the confidentiality of email 
> > communication cannot be guaranteed, we do not accept any responsibility for 
> > the confidentiality and the intactness of this message. If you have 
> > received it in error, please advise the sender by return e-mail and delete 
> > this message and any attachments. Any unauthorised use or dissemination of 
> > this information is strictly prohibited.
> 

Reply via email to