Hi Jin, Flink is an open source project, so the community works on best-effort. There's no guaranteed/quick support available. There are companies that provide commercial support if needed.
Best regards, Martijn Visser https://twitter.com/MartijnVisser82 https://github.com/MartijnVisser On Fri, 8 Apr 2022 at 12:13, Jin Yi <j...@promoted.ai> wrote: > confirmed that moving back to FlinkKafkaConsumer fixes things. > > is there some notification channel/medium that highlights critical > bugs/issues on the intended features like this pretty readily? > > On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <j...@promoted.ai> wrote: > >> based on symptoms/observations on the first operator (LogRequestFilter) >> watermark and event timestamps, it does seem like it's the bug. things >> track fine (timestamp > watermark) for the first batch of events, then the >> event timestamps go back into the past and are "late". >> >> looks like the 1.14 backport just got in 11 days ago ( >> https://github.com/apache/flink/pull/19128). is there a way to easily >> test this fix locally? based on the threads, should i just move back to >> FlinkKafkaConsumer until 1.14.5? >> >> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <renqs...@gmail.com> wrote: >> >>> Hi Jin, >>> >>> If you are using new FLIP-27 sources like KafkaSource, per-partition >>> watermark (or per-split watermark) is a default feature integrated in >>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which >>> happens during the first fetch of the source that records in the first >>> split pushes the watermark far away, then records from other splits will be >>> treated as late events. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-26018 >>> >>> Best regards, >>> >>> Qingsheng >>> >>> >>> > On Apr 8, 2022, at 15:54, Jin Yi <j...@promoted.ai> wrote: >>> > >>> > how should the code look like to verify we're using per-partition >>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in >>> 1.14.4? >>> > >>> > we currently have it looking like: >>> > >>> > streamExecutionEnvironment.fromSource( >>> > KafkaSource.<T>builder().....build(), >>> > watermarkStrategy, >>> > "whatever", >>> > typeInfo); >>> > >>> > when running this job with the streamExecutionEnviornment parallelism >>> set to 1, and the kafka source having 30 partitions, i'm seeing weird >>> behaviors where the first operator after this source consumes events out of >>> order (and therefore, violates watermarks). the operator simply checks to >>> see what "type" of event something is and uses side outputs to output the >>> type-specific messages. here's a snippet of the event timestamp going back >>> before the current watermark (first instance of going backwards in time in >>> bold): >>> > >>> > 2022-04-08 05:47:06,315 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,315 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,315 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,315 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,315 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,315 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,316 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,316 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,316 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,316 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140 >>> > 2022-04-08 05:47:06,317 WARN >>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter >>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140 >>> > >>> > >>> > >>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > I dove deeper. I wasn't actually using per-partition watermarks. >>> Thank you for the help! >>> > >>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > Thanks, Thias and Dongwon. >>> > >>> > I'll keep debugging this with the idle watermark turned off. >>> > >>> > Next TODOs: >>> > - Verify that we’re using per-partition watermarks. Our code matches >>> the example but maybe something is disabling it. >>> > - Enable logging of partition-consumer assignment, to see if that is >>> the cause of the problem. >>> > - Look at adding flags to set the source parallelism to see if that >>> fixes the issue. >>> > >>> > Yes, I've seen Flink talks on creating our own watermarks through >>> Kafka. Sounds like a good idea. >>> > >>> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> > I totally agree with Schwalbe that per-partition watermarking allows # >>> source tasks < # kafka partitions. >>> > >>> > Otherwise, Dan, you should suspect other possibilities like what >>> Schwalbe said. >>> > >>> > Best, >>> > >>> > Dongwon >>> > >>> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias < >>> matthias.schwa...@viseca.ch> wrote: >>> > Hi San, Dongwon, >>> > >>> > >>> > >>> > I share the opinion that when per-partition watermarking is enabled, >>> you should observe correct behavior … would be interesting to see why it >>> does not work for you. >>> > >>> > >>> > >>> > I’d like to clear one tiny misconception here when you write: >>> > >>> > >>> > >>> > >> - The same issue happens even if I use an idle watermark. >>> > >>> > >>> > >>> > You would expect to see glitches with watermarking when you enable >>> idleness. >>> > >>> > Idleness sort of trades watermark correctness for reduces latency when >>> processing timers (much simplified). >>> > >>> > With idleness enabled you have no guaranties whatsoever as to the >>> quality of watermarks (which might be ok in some cases). >>> > >>> > BTW we dominantly use a mix of fast and slow sources (that only update >>> once a day) which hand-pimped watermarking and late event processing, and >>> enabling idleness would break everything. >>> > >>> > >>> > >>> > Oversight put aside things should work the way you implemented it. >>> > >>> > >>> > >>> > One thing I could imagine to be a cause is >>> > >>> > • that over time the kafka partitions get reassigned to >>> different consumer subtasks which would probably stress correct >>> recalculation of watermarks. Hence #partition == number subtask might >>> reduce the problem >>> > • can you enable logging of partition-consumer assignment, to >>> see if that is the cause of the problem >>> > • also involuntary restarts of the job can cause havoc as this >>> resets watermarking >>> > >>> > >>> > I’ll be off next week, unable to take part in the active discussion … >>> > >>> > >>> > >>> > Sincere greetings >>> > >>> > >>> > >>> > Thias >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > From: Dan Hill <quietgol...@gmail.com> >>> > Sent: Freitag, 18. März 2022 08:23 >>> > To: Dongwon Kim <eastcirc...@gmail.com> >>> > Cc: user <user@flink.apache.org> >>> > Subject: Re: Weird Flink Kafka source watermark behavior >>> > >>> > >>> > >>> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ >>> > >>> > >>> > >>> > I'll try forcing # source tasks = # partitions tomorrow. >>> > >>> > >>> > >>> > Thank you, Dongwon, for all of your help! >>> > >>> > >>> > >>> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> > >>> > I believe your job with per-partition watermarking should be working >>> okay even in a backfill scenario. >>> > >>> > >>> > >>> > BTW, is the problem still observed even with # sour tasks = # >>> partitions? >>> > >>> > >>> > >>> > For committers: >>> > >>> > Is there a way to confirm that per-partition watermarking is used in >>> TM log? >>> > >>> > >>> > >>> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > >>> > I hit this using event processing and no idleness detection. The same >>> issue happens if I enable idleness. >>> > >>> > >>> > >>> > My code matches the code example for per-partition watermarking. >>> > >>> > >>> > >>> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> > >>> > Hi Dan, >>> > >>> > >>> > >>> > I'm quite confused as you already use per-partition watermarking. >>> > >>> > >>> > >>> > What I meant in the reply is >>> > >>> > - If you don't use per-partition watermarking, # tasks < # partitions >>> can cause the problem for backfill jobs. >>> > >>> > - If you don't use per-partition watermarking, # tasks = # partitions >>> is going to be okay even for backfill jobs. >>> > >>> > - If you use per-partition watermarking, # tasks < # partitions >>> shouldn't cause any problems unless you turn on the idleness detection. >>> > >>> > >>> > >>> > Regarding the idleness detection which is based on processing time, >>> what is your setting? If you set the value to 10 seconds for example, >>> you'll face the same problem unless the watermark of your backfill job >>> catches up real-time within 10 seconds. If you increase the value to 1 >>> minute, your backfill job should catch up real-time within 1 minute. >>> > >>> > >>> > >>> > Best, >>> > >>> > >>> > >>> > Dongwon >>> > >>> > >>> > >>> > >>> > >>> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > >>> > Thanks Dongwon! >>> > >>> > >>> > >>> > Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source >>> tasks < # kafka partitions. This should be called out in the docs or the >>> bug should be fixed. >>> > >>> > >>> > >>> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> > >>> > Hi Dan, >>> > >>> > >>> > >>> > Do you use the per-partition watermarking explained in [1]? >>> > >>> > I've also experienced a similar problem when running backfill jobs >>> specifically when # source tasks < # kafka partitions. >>> > >>> > - When # source tasks = # kafka partitions, the backfill job works as >>> expected. >>> > >>> > - When # source tasks < # kafka partitions, a Kafka consumer consumes >>> multiple partitions. This case can destroying the per-partition patterns as >>> explained in [2]. >>> > >>> > >>> > >>> > Hope this helps. >>> > >>> > >>> > >>> > p.s. If you plan to use the per-partition watermarking, be aware that >>> idleness detection [3] can cause another problem when you run a backfill >>> job. Kafka source tasks in a backfill job seem to read a batch of records >>> from Kafka and then wait for downstream tasks to catch up the progress, >>> which can be counted as idleness. >>> > >>> > >>> > >>> > [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie >>> > >>> > [2] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>> > >>> > [3] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources >>> > >>> > >>> > >>> > Best, >>> > >>> > >>> > >>> > Dongwon >>> > >>> > >>> > >>> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > >>> > I'm following the example from this section: >>> > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>> > >>> > >>> > >>> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > >>> > Other points >>> > >>> > - I'm using the kafka timestamp as event time. >>> > >>> > - The same issue happens even if I use an idle watermark. >>> > >>> > >>> > >>> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > >>> > There are 12 Kafka partitions (to keep the structure similar to other >>> low traffic environments). >>> > >>> > >>> > >>> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> >>> wrote: >>> > >>> > Hi. >>> > >>> > >>> > >>> > I'm running a backfill from a kafka topic with very few records spread >>> across a few days. I'm seeing a case where the records coming from a kafka >>> source have a watermark that's more recent (by hours) than the event time. >>> I haven't seen this before when running this. This violates what I'd >>> assume the kafka source would do. >>> > >>> > >>> > >>> > Example problem: >>> > >>> > 1. I have kafka records at ts=1000, 2000, ... 500000. The actual >>> times are separated by a longer time period. >>> > >>> > 2. My first operator after the FlinkKafkaConsumer sees: >>> > >>> > context.timestamp() = 1000 >>> > >>> > context.timerService().currentWatermark() = 500000 >>> > >>> > >>> > >>> > Details about how I'm running this: >>> > >>> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the >>> source. >>> > >>> > - I'm using FlinkKafkaConsumer >>> > >>> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No >>> idleness settings. >>> > >>> > - I'm running similar code in all the environments. The main >>> difference is low traffic. I have not been able to reproduce this out of >>> the environment. >>> > >>> > >>> > >>> > >>> > >>> > I put the following process function right after my kafka source. >>> > >>> > >>> > >>> > -------- >>> > >>> > >>> > AfterSource >>> > >>> > ts=1647274892728 >>> > watermark=1647575140007 >>> > record=... >>> > >>> > >>> > >>> > >>> > public static class TextLog extends ProcessFunction<Record, Record> { >>> > private final String label; >>> > public TextLogDeliveryLog(String label) { >>> > this.label = label; >>> > } >>> > @Override >>> > public void processElement(Record record, Context context, >>> Collector<Record> collector) throws Exception { >>> > LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>> > label, context.timestamp(), >>> context.timerService().currentWatermark(), record); >>> > collector.collect(deliveryLog); >>> > } >>> > } >>> > >>> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>> dieser Informationen ist streng verboten. >>> > >>> > This message is intended only for the named recipient and may contain >>> confidential or privileged information. As the confidentiality of email >>> communication cannot be guaranteed, we do not accept any responsibility for >>> the confidentiality and the intactness of this message. If you have >>> received it in error, please advise the sender by return e-mail and delete >>> this message and any attachments. Any unauthorised use or dissemination of >>> this information is strictly prohibited. >>> >>>