Hi Jin,

Flink is an open source project, so the community works on best-effort.
There's no guaranteed/quick support available. There are companies that
provide commercial support if needed.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 8 Apr 2022 at 12:13, Jin Yi <j...@promoted.ai> wrote:

> confirmed that moving back to FlinkKafkaConsumer fixes things.
>
> is there some notification channel/medium that highlights critical
> bugs/issues on the intended features like this pretty readily?
>
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <j...@promoted.ai> wrote:
>
>> based on symptoms/observations on the first operator (LogRequestFilter)
>> watermark and event timestamps, it does seem like it's the bug.  things
>> track fine (timestamp > watermark) for the first batch of events, then the
>> event timestamps go back into the past and are "late".
>>
>> looks like the 1.14 backport just got in 11 days ago (
>> https://github.com/apache/flink/pull/19128).  is there a way to easily
>> test this fix locally?  based on the threads, should i just move back to
>> FlinkKafkaConsumer until 1.14.5?
>>
>> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <renqs...@gmail.com> wrote:
>>
>>> Hi Jin,
>>>
>>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>>> watermark (or per-split watermark) is a default feature integrated in
>>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>>> happens during the first fetch of the source that records in the first
>>> split pushes the watermark far away, then records from other splits will be
>>> treated as late events.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>>
>>> Best regards,
>>>
>>> Qingsheng
>>>
>>>
>>> > On Apr 8, 2022, at 15:54, Jin Yi <j...@promoted.ai> wrote:
>>> >
>>> > how should the code look like to verify we're using per-partition
>>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>>> 1.14.4?
>>> >
>>> > we currently have it looking like:
>>> >
>>> > streamExecutionEnvironment.fromSource(
>>> >    KafkaSource.<T>builder().....build(),
>>> >    watermarkStrategy,
>>> >    "whatever",
>>> >    typeInfo);
>>> >
>>> > when running this job with the streamExecutionEnviornment parallelism
>>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>>> behaviors where the first operator after this source consumes events out of
>>> order (and therefore, violates watermarks).  the operator simply checks to
>>> see what "type" of event something is and uses side outputs to output the
>>> type-specific messages.  here's a snippet of the event timestamp going back
>>> before the current watermark (first instance of going backwards in time in
>>> bold):
>>> >
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,317 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> >
>>> >
>>> >
>>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>>> Thank you for the help!
>>> >
>>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> > Thanks, Thias and Dongwon.
>>> >
>>> > I'll keep debugging this with the idle watermark turned off.
>>> >
>>> > Next TODOs:
>>> > - Verify that we’re using per-partition watermarks.  Our code matches
>>> the example but maybe something is disabling it.
>>> > - Enable logging of partition-consumer assignment, to see if that is
>>> the cause of the problem.
>>> > - Look at adding flags to set the source parallelism to see if that
>>> fixes the issue.
>>> >
>>> > Yes, I've seen Flink talks on creating our own watermarks through
>>> Kafka.  Sounds like a good idea.
>>> >
>>> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>> > I totally agree with Schwalbe that per-partition watermarking allows #
>>> source tasks < # kafka partitions.
>>> >
>>> > Otherwise, Dan, you should suspect other possibilities like what
>>> Schwalbe said.
>>> >
>>> > Best,
>>> >
>>> > Dongwon
>>> >
>>> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>> > Hi San, Dongwon,
>>> >
>>> >
>>> >
>>> > I share the opinion that when per-partition watermarking is enabled,
>>> you should observe correct behavior … would be interesting to see why it
>>> does not work for you.
>>> >
>>> >
>>> >
>>> > I’d like to clear one tiny misconception here when you write:
>>> >
>>> >
>>> >
>>> > >> - The same issue happens even if I use an idle watermark.
>>> >
>>> >
>>> >
>>> > You would expect to see glitches with watermarking when you enable
>>> idleness.
>>> >
>>> > Idleness sort of trades watermark correctness for reduces latency when
>>> processing timers (much simplified).
>>> >
>>> > With idleness enabled you have no guaranties whatsoever as to the
>>> quality of watermarks (which might be ok in some cases).
>>> >
>>> > BTW we dominantly use a mix of fast and slow sources (that only update
>>> once a day) which hand-pimped watermarking and late event processing, and
>>> enabling idleness would break everything.
>>> >
>>> >
>>> >
>>> > Oversight put aside things should work the way you implemented it.
>>> >
>>> >
>>> >
>>> > One thing I could imagine to be a cause is
>>> >
>>> >       • that over time the kafka partitions get reassigned  to
>>> different consumer subtasks which would probably stress correct
>>> recalculation of watermarks. Hence #partition == number subtask might
>>> reduce the problem
>>> >       • can you enable logging of partition-consumer assignment, to
>>> see if that is the cause of the problem
>>> >       • also involuntary restarts of the job can cause havoc as this
>>> resets watermarking
>>> >
>>> >
>>> > I’ll be off next week, unable to take part in the active discussion …
>>> >
>>> >
>>> >
>>> > Sincere greetings
>>> >
>>> >
>>> >
>>> > Thias
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > From: Dan Hill <quietgol...@gmail.com>
>>> > Sent: Freitag, 18. März 2022 08:23
>>> > To: Dongwon Kim <eastcirc...@gmail.com>
>>> > Cc: user <user@flink.apache.org>
>>> > Subject: Re: Weird Flink Kafka source watermark behavior
>>> >
>>> >
>>> >
>>> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
>>> >
>>> >
>>> >
>>> > I'll try forcing # source tasks = # partitions tomorrow.
>>> >
>>> >
>>> >
>>> > Thank you, Dongwon, for all of your help!
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>> >
>>> > I believe your job with per-partition watermarking should be working
>>> okay even in a backfill scenario.
>>> >
>>> >
>>> >
>>> > BTW, is the problem still observed even with # sour tasks = #
>>> partitions?
>>> >
>>> >
>>> >
>>> > For committers:
>>> >
>>> > Is there a way to confirm that per-partition watermarking is used in
>>> TM log?
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> >
>>> > I hit this using event processing and no idleness detection.  The same
>>> issue happens if I enable idleness.
>>> >
>>> >
>>> >
>>> > My code matches the code example for per-partition watermarking.
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>> >
>>> > Hi Dan,
>>> >
>>> >
>>> >
>>> > I'm quite confused as you already use per-partition watermarking.
>>> >
>>> >
>>> >
>>> > What I meant in the reply is
>>> >
>>> > - If you don't use per-partition watermarking, # tasks < # partitions
>>> can cause the problem for backfill jobs.
>>> >
>>> > - If you don't use per-partition watermarking, # tasks = # partitions
>>> is going to be okay even for backfill jobs.
>>> >
>>> > - If you use per-partition watermarking, # tasks < # partitions
>>> shouldn't cause any problems unless you turn on the idleness detection.
>>> >
>>> >
>>> >
>>> > Regarding the idleness detection which is based on processing time,
>>> what is your setting? If you set the value to 10 seconds for example,
>>> you'll face the same problem unless the watermark of your backfill job
>>> catches up real-time within 10 seconds. If you increase the value to 1
>>> minute, your backfill job should catch up real-time within 1 minute.
>>> >
>>> >
>>> >
>>> > Best,
>>> >
>>> >
>>> >
>>> > Dongwon
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> >
>>> > Thanks Dongwon!
>>> >
>>> >
>>> >
>>> > Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>>> tasks < # kafka partitions.  This should be called out in the docs or the
>>> bug should be fixed.
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>> >
>>> > Hi Dan,
>>> >
>>> >
>>> >
>>> > Do you use the per-partition watermarking explained in [1]?
>>> >
>>> > I've also experienced a similar problem when running backfill jobs
>>> specifically when # source tasks < # kafka partitions.
>>> >
>>> > - When # source tasks = # kafka partitions, the backfill job works as
>>> expected.
>>> >
>>> > - When # source tasks < # kafka partitions, a Kafka consumer consumes
>>> multiple partitions. This case can destroying the per-partition patterns as
>>> explained in [2].
>>> >
>>> >
>>> >
>>> > Hope this helps.
>>> >
>>> >
>>> >
>>> > p.s. If you plan to use the per-partition watermarking, be aware that
>>> idleness detection [3] can cause another problem when you run a backfill
>>> job. Kafka source tasks in a backfill job seem to read a batch of records
>>> from Kafka and then wait for downstream tasks to catch up the progress,
>>> which can be counted as idleness.
>>> >
>>> >
>>> >
>>> > [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>> >
>>> > [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> >
>>> > [3]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>> >
>>> >
>>> >
>>> > Best,
>>> >
>>> >
>>> >
>>> > Dongwon
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> >
>>> > I'm following the example from this section:
>>> >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> >
>>> > Other points
>>> >
>>> > - I'm using the kafka timestamp as event time.
>>> >
>>> > - The same issue happens even if I use an idle watermark.
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> >
>>> > There are 12 Kafka partitions (to keep the structure similar to other
>>> low traffic environments).
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com>
>>> wrote:
>>> >
>>> > Hi.
>>> >
>>> >
>>> >
>>> > I'm running a backfill from a kafka topic with very few records spread
>>> across a few days.  I'm seeing a case where the records coming from a kafka
>>> source have a watermark that's more recent (by hours) than the event time.
>>> I haven't seen this before when running this.  This violates what I'd
>>> assume the kafka source would do.
>>> >
>>> >
>>> >
>>> > Example problem:
>>> >
>>> > 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
>>> times are separated by a longer time period.
>>> >
>>> > 2.  My first operator after the FlinkKafkaConsumer sees:
>>> >
>>> >    context.timestamp() = 1000
>>> >
>>> >    context.timerService().currentWatermark() = 500000
>>> >
>>> >
>>> >
>>> > Details about how I'm running this:
>>> >
>>> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>> source.
>>> >
>>> > - I'm using FlinkKafkaConsumer
>>> >
>>> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>> idleness settings.
>>> >
>>> > - I'm running similar code in all the environments.  The main
>>> difference is low traffic.  I have not been able to reproduce this out of
>>> the environment.
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > I put the following process function right after my kafka source.
>>> >
>>> >
>>> >
>>> > --------
>>> >
>>> >
>>> > AfterSource
>>> >
>>> > ts=1647274892728
>>> > watermark=1647575140007
>>> > record=...
>>> >
>>> >
>>> >
>>> >
>>> > public static class TextLog extends ProcessFunction<Record, Record> {
>>> >     private final String label;
>>> >     public TextLogDeliveryLog(String label) {
>>> >         this.label = label;
>>> >     }
>>> >     @Override
>>> >     public void processElement(Record record, Context context,
>>> Collector<Record> collector) throws Exception {
>>> >         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>> >                 label, context.timestamp(),
>>> context.timerService().currentWatermark(), record);
>>> >         collector.collect(deliveryLog);
>>> >     }
>>> > }
>>> >
>>> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>> >
>>> > This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>>

Reply via email to