Hi Jiri,

Thank you for the follow up.

I guess, it can happen that during start-up and the respective
rebalances some partitions are read more often than others and that
consequently the timestamps in the repartition topic are mixed up more
than during normal operation. Unfortunately, I do not know how to
resolve this other than you did.

Best,
Bruno

On Mon, Feb 24, 2020 at 10:58 AM Samek, Jiří <sa...@avast.com.invalid> wrote:
>
> Hi all,
>
> I am writing to describe where I got with the issue.
>
> The next thing I wanted to do was to check topics if they contain records
> with mixed timestamps in single partition that could cause the
> warning "Skipping record for expired segment" - meaning, timestamp of
> incoming record is behind observed stream time and out-of a join window. I
> did the check. They do have the mixed timestamps. The input topics of the
> streaming app have timestamps in order. But I need to do repartitioning to
> introduce key needed for the join. And the repartition topics have mixed
> timestamps in single partition.
>
> It doesn't happen during continues run of the streaming application. It
> happens when I stop the streaming application for several minutes or
> more. The stream app is deployed in 10 instances. My theory is, that stream
> tasks doesn't start at the same time or doesn't process records at the same
> speed. So it can happen that one task is writing records with different
> timestamps than other task is writing to a given partition of the
> repartition topic. And so they get mixed. I am not aware of mechanism in
> Kafka Streams that could prevent mixing timestamps in repartition topics.
> If there is one, or if there is a configuration or something that could
> mitigate it, please let me know.
>
> So, in light of it, I think the warning is definitely a good thing. I have
> increased join-window duration to handle 2 hours pauses in stream
> processing (2 hour out-of-order records) which should do it for most cases.
> Increasing window duration has memory and cpu impact, I still wonder if
> there is more efficient way how to resolve it.
>
> Best Regards,
> Jiri
>
>
>
>
> On Tue, Feb 11, 2020 at 6:45 PM John Roesler <vvcep...@apache.org> wrote:
>
> > Hi Jiří,
> >
> > Hmm, that is a mystery indeed. to inspect the log, you could try
> > kafka-dump-log (I've never used it).
> >
> > What I have done before is use kafka-console-consumer, with the
> > following option specified:
> > --property <String: prop>
> >                                            properties include:
> >
> >                                                 print.timestamp=true|false
> >
> > Which version of Streams are you running? This is bringing up a
> > vague memory of when I refactored the retention time logic a while
> > back, and added logic to skip writing changelog records upon restore
> > when we detect that they would already be expired according to the
> > current stream time. Previously, we would go ahead and write them
> > and then have to rotate store segments later on during the restoration
> > when we reach the current stream time. This is a pretty heavy and
> > completely avoidable I/O operation. If this is what's happening, then
> > it's just an unforseen consequence of the new log level. We might
> > need to follow up with a change to suppress the warnings specifically
> > in this circumstance.
> >
> > Feel free to open a bug ticket with all the relevant version info, repro,
> > logs etc., you've collected if you feel like the above might be what's
> > happening.
> >
> > For clarity, this wouldn't be a correctness problem at all, just a
> > misleading
> > and troubling log message we shouldn't be producing.
> >
> > Thanks,
> > -John
> >
> > On Tue, Feb 11, 2020, at 11:21, Samek, Jiří wrote:
> > > Hi Bruno, John and Sophie,
> > >
> > > thank you very much for quick responses, you are the best. After thinking
> > > about it a little bit more, it seems fishy.
> > >
> > > From logs, I see that it is not happening when application is running
> > > normally.
> > >
> > > I have checked timestamps (windowStartTimestamp) - connecting local
> > > instance in debug mode to Kafka cluster. And they are mixed up. Not
> > always,
> > > there can be a day with good sequence and then a time interval with mixed
> > > up timestamps, like these (store retention is 20.6 minutes):
> > > StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:03:36.221Z
> > > StreamThread-1.task.1_57, 2020-02-07T13:12:16.691Z
> > > StreamThread-1.task.1_57, 2020-02-07T12:56:55.214Z
> > >
> > > Picking a few of these, the stack trace was like:
> > > put:134, InMemoryWindowStore (org.apache.kafka.streams.state.internals)
> > > lambda$init$0:112, InMemoryWindowStore
> > > (org.apache.kafka.streams.state.internals)
> > > restore:-1, 69348804
> > >
> > (org.apache.kafka.streams.state.internals.InMemoryWindowStore$$Lambda$270)
> > > lambda$adapt$1:47, StateRestoreCallbackAdapter
> > > (org.apache.kafka.streams.processor.internals)
> > > restoreBatch:-1, 791473363
> > >
> > (org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$269)
> > > restoreBatch:89, CompositeRestoreListener
> > > (org.apache.kafka.streams.processor.internals)
> > > restore:92, StateRestorer (org.apache.kafka.streams.processor.internals)
> > > processNext:349, StoreChangelogReader
> > > (org.apache.kafka.streams.processor.internals)
> > > restore:93, StoreChangelogReader
> > > (org.apache.kafka.streams.processor.internals)
> > > updateNewAndRestoringTasks:389, TaskManager
> > > (org.apache.kafka.streams.processor.internals)
> > > runOnce:769, StreamThread (org.apache.kafka.streams.processor.internals)
> > > runLoop:698, StreamThread (org.apache.kafka.streams.processor.internals)
> > > run:671, StreamThread (org.apache.kafka.streams.processor.internals)
> > >
> > > So I believe it happens on stream restoration phase. And it's restoring
> > > state from internal changelog topic. It's all task.1_57 so I expect that
> > it
> > > is a single partition.
> > >
> > > Thinking about it, I don't understand how such a case can even
> > > theoretically happen. I expect that a window, in order to be written to
> > the
> > > changelog topic, first needs to go through "put"; so even if it's mixed
> > on
> > > the input side, it should be skipped if expired at the moment of "put"
> > > (relatively to observedStreamTime) and on restoration everything should
> > be
> > > fine.
> > >
> > > As the next step, I would like to list/inspect records and their
> > timestamps
> > > from given partition of the changelog topic via a command line tool (or
> > in
> > > some other way) - to confirm if they are really stored this way. If you
> > > have a tip on how to do it, please let me know.
> > >
> > > That is all I have for now. I would like to resolve it. I will post it
> > here
> > > if I come up with something new.
> > >
> > > Thank you
> > > Jiri
> > >
> > >
> > >
> > > On Mon, Feb 10, 2020 at 10:14 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > > >
> > > > Hey all,
> > > >
> > > > Sorry for the confusion. Bruno set me straight offline.
> > > >
> > > > Previously, we had metrics for each reason for skipping records, and
> > the
> > > > rationale was that you would monitor the metrics and only turn to the
> > logs
> > > > if you needed to *debug* unexpected record skipping. Note that skipping
> > > > records by itself isn't a cause for concern, since this is exactly what
> > > Streams
> > > > is designed to do in a number of situations.
> > > >
> > > > However, during the KIP-444 discussion, the decision was reversed, and
> > we
> > > > decided to just log one "roll-up" metric for all skips and increase the
> > > log
> > > > messages to warning level for debuggability. This particularly makes
> > sense
> > > > because you otherwise would have to restart the application to change
> > the
> > > > log level if you needed to figure out why the single skipped-record
> > metric
> > > > is non-zero. And then you may not even observe it again.
> > > >
> > > > I either missed the memo on that discussion, or participated in it and
> > > then
> > > > forgot it even happened. I'm not sure I want to look back at the
> > thread to
> > > > find out.
> > > >
> > > > Anyway, I've closed the PR I opened to move it back to debug. We should
> > > > still try to help figure out the root cause of this particular email
> > > thread,
> > > > though.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote:
> > > > > While I agree that seems like it was probably a refactoring mistake,
> > I'm
> > > > > not
> > > > > convinced it isn't the right thing to do. John, can you reiterate the
> > > > > argument
> > > > > for setting it to debug way back when?
> > > > >
> > > > > I would actually present this exact situation as an argument for
> > > keeping it
> > > > > as
> > > > > warn, since something indeed seems fishy here that was only surfaced
> > > > > through this warning. That said, maybe the metric is the more
> > > appropriate
> > > > > way to bring attention to this: not sure if it's info or debug level
> > > > > though, or
> > > > > how likely it is that anyone really pays attention to it?
> > > > >
> > > > > On Mon, Feb 10, 2020 at 9:53 AM John Roesler <j...@vvcephei.org>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I’m sorry for the trouble. It looks like it was a mistake during
> > > > > >
> > > > > > https://github.com/apache/kafka/pull/6521
> > > > > >
> > > > > > Specifically, while addressing code review comments to change a
> > bunch
> > > of
> > > > > > other logs from debugs to warnings, that one seems to have been
> > > included by
> > > > > > accident:
> > > > > >
> > >
> > https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
> > > > > >
> > > > > > I’ll see if I can fix it today.
> > > > > >
> > > > > > Regarding Bruno's thoughts, there was a pretty old decision to
> > > capture the
> > > > > > "skipped records" as a metric for visibility and log it at the
> > debug
> > > level
> > > > > > for debuggability. We decided that "warning" wasn't the right level
> > > because
> > > > > > Streams is operating completely as specified.
> > > > > >
> > > > > > However, I do agree that it doesn't seem right to see more skipped
> > > records
> > > > > > during start-up; I would expect to see exactly the same records
> > > skipped
> > > > > > during start-up as during regular processing, since the skipping
> > > logic is
> > > > > > completely deterministic and based on the sequence of timestamps
> > your
> > > > > > records have in the topic.  Maybe you just notice it more during
> > > startup?
> > > > > > I.e., if there are 1000 warning logs spread over a few months, then
> > > you
> > > > > > don't notice it, but when you see them all together at start-up,
> > it's
> > > more
> > > > > > concerning?
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am pretty sure this was intentional. All skipped records log
> > > > > > > messages are on WARN level.
> > > > > > >
> > > > > > > If a lot of your records are skipped on app restart with this log
> > > > > > > message on WARN-level, they were also skipped with the log
> > message
> > > on
> > > > > > > DEBUG-level. You simply did not know about it before. With an
> > > > > > > in-memory window store, this message is logged when a window
> > with a
> > > > > > > start time older than the current stream time minus the retention
> > > > > > > period is put into the window store, i.e., the window is NOT
> > > inserted
> > > > > > > into the window stroe. If you get a lot of them on app restart,
> > you
> > > > > > > should have a look at the timestamps of your records and the
> > > retention
> > > > > > > of your window store. If those values do not explain the
> > behavior,
> > > > > > > please try to find a minimal example that shows the issue and
> > post
> > > it
> > > > > > > here on the mailing list.
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří
> > <sa...@avast.com.invalid
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > in
> > > > > > > >
> > > > > >
> > >
> > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> > > > > > > >
> > > > > > > > log level of "Skipping record for expired segment" was changed
> > > from
> > > > > > debug
> > > > > > > > to warn. Was it intentional change? Should it be somehow
> > handled
> > > by
> > > > > > user?
> > > > > > > > How can user handle it? I am getting a lot of these on app
> > > restart.
> > > > > > >
> > > > > >
> > > > >
> > >
> > >
> > >
> > > --
> > >
> > > Jiří Samek | Software Developer
> > >
> > > AVAST Software s.r.o. | Pikrtova 1737/1a | 140 00  Praha 4
> > >
> > > M +420 734 524 549 | E sa...@avast.com | W www.avast.com
> > >
> >
>
>
> --
>
> *Jiří Samek * <sa...@avast.com>| *Software Developer*
>
> *AVAST Software s.r.o.* | Pikrtova 1737/1a | 140 00  Praha 4
>
> *M* +420 734 524 549 | *E* sa...@avast.com | *W* www.avast.com

Reply via email to