enforced-processing-total is zero for all missed join occurrences. I logged
all the metrics out at the time my stream processed the missed join, so let
me know if there are any other metics that would help.

On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

> I'm not sure. When I ran with trace logging turned on I saw a bunch of
> messages like the ones below. Do those messages indicate
> "enforced-processing"? It gets logged right after the call
> to enforcedProcessingSensor.record.
>
> Continuing to process although some partitions are empty on the broker.
> There may be out-of-order processing for this task as a result. Partitions
> with local data: [status-5]. Partitions we gave up waiting for, with their
> corresponding deadlines: {event-5=1635881287722}. Configured
> max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.
>
> Continuing to process although some partitions are empty on the broker.
> There may be out-of-order processing for this task as a result. Partitions
> with local data: [event-5]. Partitions we gave up waiting for, with their
> corresponding deadlines: {status-5=1635881272754}. Configured
> max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.
>
> On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Can you check if the program ever does "enforced processing", ie,
>> `max.task.idle.ms` passed, and we process despite an empty input buffer.
>>
>> Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>>
>> As long as there is input data, we should never do "enforced processing"
>> and the metric should stay at zero.
>>
>>
>> -Matthias
>>
>> On 11/3/21 2:41 PM, Chad Preisler wrote:
>> > Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds)
>> had no
>> > effect on this issue.
>> >
>> > On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com>
>> > wrote:
>> >
>> >> No unfortunately it is not the case. The table record is written about
>> 20
>> >> seconds before the stream record. I’ll crank up the time tomorrow and
>> see
>> >> what happens.
>> >>
>> >> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >>
>> >>> Hard to tell, but as it seems that you can reproduce the issue, it
>> might
>> >>> be worth a try to increase the idle time further.
>> >>>
>> >>> I guess one corner case for stream-table join that is not resolved yet
>> >>> is when stream and table record have the same timestamp... For this
>> >>> case, the table record might not be processed first.
>> >>>
>> >>> Could you hit this case?
>> >>>
>> >>>
>> >>> -Matthias
>> >>>
>> >>> On 11/2/21 3:13 PM, Chad Preisler wrote:
>> >>>> Thank you for the information. We are using the Kafka 3.0 client
>> >>> library.
>> >>>> We are able to reliably reproduce this issue in our test environment
>> >>> now. I
>> >>>> removed my timestamp extractor, and I set the max.task.idle.ms to
>> >>> 2000. I
>> >>>> also turned on trace logging for package
>> >>>> org.apache.kafka.streams.processor.internals.
>> >>>>
>> >>>> To create the issue we stopped the application and ran enough data to
>> >>>> create a lag of 400 messages. We saw 5 missed joins.
>> >>>>
>> >>>>   From the stream-thread log messages we saw the event message, our
>> >>> stream
>> >>>> missed the join, and then several milliseconds later we saw the
>> >>>> stream-thread print out the status message. The stream-thread printed
>> >>> out
>> >>>> our status message a total of 5 times.
>> >>>>
>> >>>> Given that only a few milliseconds passed between missing the join
>> and
>> >>> the
>> >>>> stream-thread printing the status message, would increasing the
>> >>>> max.task.idle.ms help?
>> >>>>
>> >>>> Thanks,
>> >>>> Chad
>> >>>>
>> >>>> On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org>
>> >>> wrote:
>> >>>>
>> >>>>> Timestamp synchronization is not perfect, and as a matter of fact,
>> we
>> >>>>> fixed a few gaps in 3.0.0 release. We actually hope, that we closed
>> the
>> >>>>> last gaps in 3.0.0... *fingers-crossed* :)
>> >>>>>
>> >>>>>> We are using a timestamp extractor that returns 0.
>> >>>>>
>> >>>>> You can do this, and it effectively "disables" timestamp
>> >>> synchronization
>> >>>>> as records on the KTable side don't have a timeline any longer. As a
>> >>>>> side effect it also allows you to "bootstrap" the table, as records
>> >>> with
>> >>>>> timestamp zero will always be processed first (as they are
>> smaller). Of
>> >>>>> course, you also don't have time synchronization for "future" data
>> and
>> >>>>> your program becomes non-deterministic if you reprocess old data.
>> >>>>>
>> >>>>>> his seemed to be the only
>> >>>>>> way to bootstrap enough records at startup to avoid the missed
>> join.
>> >>>>>
>> >>>>> Using 3.0.0 and enabling timestamp synchronization via
>> >>>>> `max.task.idle.ms` config, should allow you to get the correct
>> >>> behavior
>> >>>>> without the zero-extractor (of course, your KTable data must have
>> >>>>> smaller timestamps that your KStream data).
>> >>>>>
>> >>>>>> If I use "timestamp synchronization" do I have to remove the zero
>> >>>>>> timestamp extractor? If I remove the zero timestamp extractor will
>> >>>>>> timestamp synchronization take care of the missed join issue on
>> >>> startup?
>> >>>>>
>> >>>>> To be more precise: timestamp synchronization is _always_ on. The
>> >>>>> question is just how strict it is applied. By default, we do the
>> >>> weakest
>> >>>>> from which is only best effort.
>> >>>>>
>> >>>>>> I'm guessing the issue here is that occasionally the poll request
>> is
>> >>> not
>> >>>>>> returning the matching record for the KTable side of the join
>> before
>> >>> the
>> >>>>>> task goes off and starts processing records.
>> >>>>>
>> >>>>> Yes, because of default best effort approach. That is why you should
>> >>>>> increase `max.task.idle.ms` to detect this case and "skip"
>> processing
>> >>>>> and let KS do another poll() to get KTable data.
>> >>>>>
>> >>>>> 2.8 and earlier:
>> >>>>>
>> >>>>> max.task.idle.ms=0 -> best effort (no poll() retry)
>> >>>>> max.task.idle.ms>0 -> try to do another poll() until data is there
>> or
>> >>>>> idle time passed
>> >>>>>
>> >>>>> Note: >0 might still "fail" even if there is data, because consumer
>> >>>>> fetch behavior is not predictable.
>> >>>>>
>> >>>>>
>> >>>>> 3.0:
>> >>>>>
>> >>>>> max.task.idle.ms=-1 -> best effort (no poll() retry)
>> >>>>> max.task.idle.ms=0 -> if there is data broker side, repeat to
>> poll()
>> >>>>> until you get the data
>> >>>>> max.task.idle.ms>0 -> even if there is not data broker side, wait
>> >>> until
>> >>>>> data becomes available or the idle time passed
>> >>>>>
>> >>>>>
>> >>>>> Hope this helps.
>> >>>>>
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>> On 11/1/21 4:29 PM, Guozhang Wang wrote:
>> >>>>>> Hello Chad,
>> >>>>>>
>> >>>>>>    From your earlier comment, you mentioned "In my scenario the
>> records
>> >>>>> were
>> >>>>>> written to the KTable topic before the record was written to the
>> >>> KStream
>> >>>>>> topic." So I think Matthias and others have excluded this
>> possibility
>> >>>>> while
>> >>>>>> trying to help investigate.
>> >>>>>>
>> >>>>>> If only the matching records from KStream are returned via a
>> single a
>> >>>>>> consumer poll call but not the other records from KTable, then you
>> >>> would
>> >>>>>> miss this matched join result.
>> >>>>>>
>> >>>>>> Guozhang
>> >>>>>>
>> >>>>>>
>> >>>>>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <
>> >>> chad.preis...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Thank you for your response and the links to the presentations.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> *However, this seems tobe orthogonal to your issue?*
>> >>>>>>>
>> >>>>>>> Yes. From what I see in the code it looks like you have a single
>> >>>>> consumer
>> >>>>>>> subscribed to multiple topics. Please correct me if I'm wrong.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> *By default, timestamp synchronization is disabled. Maybeenabling
>> it
>> >>>>> would
>> >>>>>>> help?*
>> >>>>>>>
>> >>>>>>> We are using a timestamp extractor that returns 0. We did that
>> >>> because
>> >>>>> we
>> >>>>>>> were almost always missing joins on startup, and this seemed to be
>> >>> the
>> >>>>> only
>> >>>>>>> way to bootstrap enough records at startup to avoid the missed
>> join.
>> >>> We
>> >>>>>>> found a post that said doing that would make the KTable act like
>> the
>> >>>>>>> GlobalKTable at startup. So far this works great, we never miss a
>> >>> join
>> >>>>> on a
>> >>>>>>> startup. If I use "timestamp synchronization" do I have to remove
>> the
>> >>>>> zero
>> >>>>>>> timestamp extractor? If I remove the zero timestamp extractor will
>> >>>>>>> timestamp synchronization take care of the missed join issue on
>> >>> startup?
>> >>>>>>>
>> >>>>>>> I'm guessing the issue here is that occasionally the poll request
>> is
>> >>> not
>> >>>>>>> returning the matching record for the KTable side of the join
>> before
>> >>> the
>> >>>>>>> task goes off and starts processing records. Later when we put the
>> >>> same
>> >>>>>>> record on the topic and the KTable has had a chance to load more
>> >>> records
>> >>>>>>> the join works and everything is good to go. Because of the way
>> our
>> >>>>> system
>> >>>>>>> works no new status records have been written and so the new
>> record
>> >>>>> joins
>> >>>>>>> against the correct status.
>> >>>>>>>
>> >>>>>>> Do you agree that the poll request is returning the KStream record
>> >>> but
>> >>>>> not
>> >>>>>>> returning the KTable record and therefore the join is getting
>> >>> missed? If
>> >>>>>>> you don't agree, what do you think is going on? Is there a way to
>> >>> prove
>> >>>>>>> this out?
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Chad
>> >>>>>>>
>> >>>>>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org
>> >
>> >>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Yes, a StreamThread has one consumer. The number of StreamThreads
>> >>> per
>> >>>>>>>> instance is configurable via `num.stream.threads`. Partitions are
>> >>>>>>>> assigned to threads similar to consumer is a plain consumer
>> group.
>> >>>>>>>>
>> >>>>>>>> It seems you run with the default of one thread per instance. As
>> you
>> >>>>>>>> spin up 12 instances, it results in 12 threads for the
>> application.
>> >>> As
>> >>>>>>>> you have 12 partitions, using more threads won't be useful as no
>> >>>>>>>> partitions are left for them to process.
>> >>>>>>>>
>> >>>>>>>> For a stream-table joins, there will be one task per "partition
>> >>> pair"
>> >>>>>>>> that computes the join for those partitions. So you get 12 tasks,
>> >>> and
>> >>>>>>>> each thread processes one task in your setup. Ie, a thread
>> consumer
>> >>> is
>> >>>>>>>> reading data for both input topics.
>> >>>>>>>>
>> >>>>>>>> Pausing happens on a per-partition bases: for joins there is two
>> >>>>> buffers
>> >>>>>>>> per task (one for each input topic partition). It's possible that
>> >>> one
>> >>>>>>>> partition is paused while the other is processed. However, this
>> >>> seems
>> >>>>> to
>> >>>>>>>> be orthogonal to your issue?
>> >>>>>>>>
>> >>>>>>>> For a GlobalKTable, you get an additional GlobalThread that only
>> >>> reads
>> >>>>>>>> the data from the "global topic" to update the GlobalKTable.
>> >>> Semantics
>> >>>>>>>> of KStream-KTable and KStream-GlobalKTable joins are different:
>> Cf
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>
>> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
>> >>>>>>>>
>> >>>>>>>> For the timestamp synchronization, you may checkout `
>> >>> max.task.idle.ms`
>> >>>>>>>> config. By default, timestamp synchronization is disabled. Maybe
>> >>>>>>>> enabling it would help?
>> >>>>>>>>
>> >>>>>>>> You may also check out slides 34-38:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>
>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>> >>>>>>>>
>> >>>>>>>> There is one corner case: if two records with the same timestamp
>> >>> come
>> >>>>>>>> it, it's not defined which one will be processed first.
>> >>>>>>>>
>> >>>>>>>> Hope this helps.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 10/30/21 6:45 AM, Chad Preisler wrote:
>> >>>>>>>>> Yes, this helped. I have some additional questions.
>> >>>>>>>>>
>> >>>>>>>>> Does StreamThread have one consumer? (Looks like it, but just
>> want
>> >>> to
>> >>>>>>>>> confirm)
>> >>>>>>>>> Is there a separate StreamThread for each topic including the
>> >>> KTable?
>> >>>>>>>>> If a KTable is a StreamThread and there is a  StreamTask for
>> that
>> >>>>>>> KTable,
>> >>>>>>>>> could my buffer be getting filled up, and the mainConsumer for
>> the
>> >>>>>>> KTable
>> >>>>>>>>> be getting paused? I see this code in StreamTask#addRecords.
>> >>>>>>>>>
>> >>>>>>>>> // if after adding these records, its partition queue's buffered
>> >>> size
>> >>>>>>> has
>> >>>>>>>>> been
>> >>>>>>>>>             // increased beyond the threshold, we can then
>> pause the
>> >>>>>>>>> consumption for this partition
>> >>>>>>>>>             if (newQueueSize > maxBufferedSize) {
>> >>>>>>>>>                 mainConsumer.pause(singleton(partition));
>> >>>>>>>>>             }
>> >>>>>>>>>
>> >>>>>>>>> Is there any specific logging that I can set to debug or trace
>> that
>> >>>>>>> would
>> >>>>>>>>> help me troubleshoot? I'd prefer not to turn debug and/or trace
>> on
>> >>> for
>> >>>>>>>>> every single class.
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Chad
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com>
>> >>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Chad,
>> >>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams
>> >>> internal
>> >>>>>>>> code
>> >>>>>>>>>> that reads records for the join?
>> >>>>>>>>>> --> You can check StreamThread#pollPhase, where stream thread
>> >>> (main
>> >>>>>>>>>> consumer) periodically poll records. And then, it'll process
>> each
>> >>>>>>>> topology
>> >>>>>>>>>> node with these polled records in stream tasks
>> >>> (StreamTask#process).
>> >>>>>>>>>>
>> >>>>>>>>>> Hope that helps.
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks.
>> >>>>>>>>>> Luke
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
>> >>>>>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re
>> >>> seeing
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>
>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
>> >>>>>>>>>>> <
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>
>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Gilles
>> >>>>>>>>>>>
>> >>>>>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler <
>> >>> chad.preis...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Hello,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I have a stream application that does a KStream to KTable
>> left
>> >>>>> join.
>> >>>>>>>> We
>> >>>>>>>>>>>> seem to be occasionally missing joins
>> >>> <
>> https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g
>> >
>> >>> (KTable side is null).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams
>> >>> internal
>> >>>>>>>>>> code
>> >>>>>>>>>>>> that reads records for the join? I've poked around the Kafka
>> >>> code
>> >>>>>>>> base,
>> >>>>>>>>>>> but
>> >>>>>>>>>>>> there is a lot there. I imagine there is some consumer poll
>> for
>> >>>>> each
>> >>>>>>>>>> side
>> >>>>>>>>>>>> of the join, and possibly a background thread for reading the
>> >>>>> KTable
>> >>>>>>>>>>> topic.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I figure there are several possible causes of this issue, and
>> >>> since
>> >>>>>>>>>>> nothing
>> >>>>>>>>>>>> is really jumping out in my code, I was going to start
>> looking
>> >>> at
>> >>>>>>> the
>> >>>>>>>>>>> Kafka
>> >>>>>>>>>>>> code to see if there is something I can do to fix this.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>> Chad
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> --
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Funding Circle Limited is authorised and regulated by the
>> >>> Financial
>> >>>>>>>>>>> Conduct Authority under firm registration number 722513.
>> Funding
>> >>>>>>> Circle
>> >>>>>>>>>> is
>> >>>>>>>>>>> not covered by the Financial Services Compensation Scheme.
>> >>>>> Registered
>> >>>>>>>> in
>> >>>>>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen
>> >>>>>>> Victoria
>> >>>>>>>>>>> Street, London EC4V 4AY.
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to