Hi Jiri, Thank you for the follow up.
I guess, it can happen that during start-up and the respective rebalances some partitions are read more often than others and that consequently the timestamps in the repartition topic are mixed up more than during normal operation. Unfortunately, I do not know how to resolve this other than you did. Best, Bruno On Mon, Feb 24, 2020 at 10:58 AM Samek, Jiří <sa...@avast.com.invalid> wrote: > > Hi all, > > I am writing to describe where I got with the issue. > > The next thing I wanted to do was to check topics if they contain records > with mixed timestamps in single partition that could cause the > warning "Skipping record for expired segment" - meaning, timestamp of > incoming record is behind observed stream time and out-of a join window. I > did the check. They do have the mixed timestamps. The input topics of the > streaming app have timestamps in order. But I need to do repartitioning to > introduce key needed for the join. And the repartition topics have mixed > timestamps in single partition. > > It doesn't happen during continues run of the streaming application. It > happens when I stop the streaming application for several minutes or > more. The stream app is deployed in 10 instances. My theory is, that stream > tasks doesn't start at the same time or doesn't process records at the same > speed. So it can happen that one task is writing records with different > timestamps than other task is writing to a given partition of the > repartition topic. And so they get mixed. I am not aware of mechanism in > Kafka Streams that could prevent mixing timestamps in repartition topics. > If there is one, or if there is a configuration or something that could > mitigate it, please let me know. > > So, in light of it, I think the warning is definitely a good thing. I have > increased join-window duration to handle 2 hours pauses in stream > processing (2 hour out-of-order records) which should do it for most cases. > Increasing window duration has memory and cpu impact, I still wonder if > there is more efficient way how to resolve it. > > Best Regards, > Jiri > > > > > On Tue, Feb 11, 2020 at 6:45 PM John Roesler <vvcep...@apache.org> wrote: > > > Hi Jiří, > > > > Hmm, that is a mystery indeed. to inspect the log, you could try > > kafka-dump-log (I've never used it). > > > > What I have done before is use kafka-console-consumer, with the > > following option specified: > > --property <String: prop> > > properties include: > > > > print.timestamp=true|false > > > > Which version of Streams are you running? This is bringing up a > > vague memory of when I refactored the retention time logic a while > > back, and added logic to skip writing changelog records upon restore > > when we detect that they would already be expired according to the > > current stream time. Previously, we would go ahead and write them > > and then have to rotate store segments later on during the restoration > > when we reach the current stream time. This is a pretty heavy and > > completely avoidable I/O operation. If this is what's happening, then > > it's just an unforseen consequence of the new log level. We might > > need to follow up with a change to suppress the warnings specifically > > in this circumstance. > > > > Feel free to open a bug ticket with all the relevant version info, repro, > > logs etc., you've collected if you feel like the above might be what's > > happening. > > > > For clarity, this wouldn't be a correctness problem at all, just a > > misleading > > and troubling log message we shouldn't be producing. > > > > Thanks, > > -John > > > > On Tue, Feb 11, 2020, at 11:21, Samek, Jiří wrote: > > > Hi Bruno, John and Sophie, > > > > > > thank you very much for quick responses, you are the best. After thinking > > > about it a little bit more, it seems fishy. > > > > > > From logs, I see that it is not happening when application is running > > > normally. > > > > > > I have checked timestamps (windowStartTimestamp) - connecting local > > > instance in debug mode to Kafka cluster. And they are mixed up. Not > > always, > > > there can be a day with good sequence and then a time interval with mixed > > > up timestamps, like these (store retention is 20.6 minutes): > > > StreamThread-1.task.1_57, 2020-02-07T13:05:46.550Z > > > StreamThread-1.task.1_57, 2020-02-07T13:12:07.870Z > > > StreamThread-1.task.1_57, 2020-02-07T13:10:49.980Z > > > StreamThread-1.task.1_57, 2020-02-07T13:12:55.909Z > > > StreamThread-1.task.1_57, 2020-02-07T13:09:02.662Z > > > StreamThread-1.task.1_57, 2020-02-07T13:13:08.651Z > > > StreamThread-1.task.1_57, 2020-02-07T13:06:53.946Z > > > StreamThread-1.task.1_57, 2020-02-07T13:11:58.188Z > > > StreamThread-1.task.1_57, 2020-02-07T12:59:42.884Z > > > StreamThread-1.task.1_57, 2020-02-07T13:07:30.412Z > > > StreamThread-1.task.1_57, 2020-02-07T12:55:53.328Z > > > StreamThread-1.task.1_57, 2020-02-07T12:44:51.912Z > > > StreamThread-1.task.1_57, 2020-02-07T12:59:27.364Z > > > StreamThread-1.task.1_57, 2020-02-07T13:01:34.313Z > > > StreamThread-1.task.1_57, 2020-02-07T13:07:56.379Z > > > StreamThread-1.task.1_57, 2020-02-07T12:45:32.984Z > > > StreamThread-1.task.1_57, 2020-02-07T12:45:44.232Z > > > StreamThread-1.task.1_57, 2020-02-07T12:45:59.594Z > > > StreamThread-1.task.1_57, 2020-02-07T12:46:02.860Z > > > StreamThread-1.task.1_57, 2020-02-07T13:02:17.658Z > > > StreamThread-1.task.1_57, 2020-02-07T12:46:25.125Z > > > StreamThread-1.task.1_57, 2020-02-07T12:46:44.864Z > > > StreamThread-1.task.1_57, 2020-02-07T12:44:44.074Z > > > StreamThread-1.task.1_57, 2020-02-07T13:03:36.221Z > > > StreamThread-1.task.1_57, 2020-02-07T13:12:16.691Z > > > StreamThread-1.task.1_57, 2020-02-07T12:56:55.214Z > > > > > > Picking a few of these, the stack trace was like: > > > put:134, InMemoryWindowStore (org.apache.kafka.streams.state.internals) > > > lambda$init$0:112, InMemoryWindowStore > > > (org.apache.kafka.streams.state.internals) > > > restore:-1, 69348804 > > > > > (org.apache.kafka.streams.state.internals.InMemoryWindowStore$$Lambda$270) > > > lambda$adapt$1:47, StateRestoreCallbackAdapter > > > (org.apache.kafka.streams.processor.internals) > > > restoreBatch:-1, 791473363 > > > > > (org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$269) > > > restoreBatch:89, CompositeRestoreListener > > > (org.apache.kafka.streams.processor.internals) > > > restore:92, StateRestorer (org.apache.kafka.streams.processor.internals) > > > processNext:349, StoreChangelogReader > > > (org.apache.kafka.streams.processor.internals) > > > restore:93, StoreChangelogReader > > > (org.apache.kafka.streams.processor.internals) > > > updateNewAndRestoringTasks:389, TaskManager > > > (org.apache.kafka.streams.processor.internals) > > > runOnce:769, StreamThread (org.apache.kafka.streams.processor.internals) > > > runLoop:698, StreamThread (org.apache.kafka.streams.processor.internals) > > > run:671, StreamThread (org.apache.kafka.streams.processor.internals) > > > > > > So I believe it happens on stream restoration phase. And it's restoring > > > state from internal changelog topic. It's all task.1_57 so I expect that > > it > > > is a single partition. > > > > > > Thinking about it, I don't understand how such a case can even > > > theoretically happen. I expect that a window, in order to be written to > > the > > > changelog topic, first needs to go through "put"; so even if it's mixed > > on > > > the input side, it should be skipped if expired at the moment of "put" > > > (relatively to observedStreamTime) and on restoration everything should > > be > > > fine. > > > > > > As the next step, I would like to list/inspect records and their > > timestamps > > > from given partition of the changelog topic via a command line tool (or > > in > > > some other way) - to confirm if they are really stored this way. If you > > > have a tip on how to do it, please let me know. > > > > > > That is all I have for now. I would like to resolve it. I will post it > > here > > > if I come up with something new. > > > > > > Thank you > > > Jiri > > > > > > > > > > > > On Mon, Feb 10, 2020 at 10:14 PM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > > Hey all, > > > > > > > > Sorry for the confusion. Bruno set me straight offline. > > > > > > > > Previously, we had metrics for each reason for skipping records, and > > the > > > > rationale was that you would monitor the metrics and only turn to the > > logs > > > > if you needed to *debug* unexpected record skipping. Note that skipping > > > > records by itself isn't a cause for concern, since this is exactly what > > > Streams > > > > is designed to do in a number of situations. > > > > > > > > However, during the KIP-444 discussion, the decision was reversed, and > > we > > > > decided to just log one "roll-up" metric for all skips and increase the > > > log > > > > messages to warning level for debuggability. This particularly makes > > sense > > > > because you otherwise would have to restart the application to change > > the > > > > log level if you needed to figure out why the single skipped-record > > metric > > > > is non-zero. And then you may not even observe it again. > > > > > > > > I either missed the memo on that discussion, or participated in it and > > > then > > > > forgot it even happened. I'm not sure I want to look back at the > > thread to > > > > find out. > > > > > > > > Anyway, I've closed the PR I opened to move it back to debug. We should > > > > still try to help figure out the root cause of this particular email > > > thread, > > > > though. > > > > > > > > Thanks, > > > > -John > > > > > > > > On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote: > > > > > While I agree that seems like it was probably a refactoring mistake, > > I'm > > > > > not > > > > > convinced it isn't the right thing to do. John, can you reiterate the > > > > > argument > > > > > for setting it to debug way back when? > > > > > > > > > > I would actually present this exact situation as an argument for > > > keeping it > > > > > as > > > > > warn, since something indeed seems fishy here that was only surfaced > > > > > through this warning. That said, maybe the metric is the more > > > appropriate > > > > > way to bring attention to this: not sure if it's info or debug level > > > > > though, or > > > > > how likely it is that anyone really pays attention to it? > > > > > > > > > > On Mon, Feb 10, 2020 at 9:53 AM John Roesler <j...@vvcephei.org> > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I’m sorry for the trouble. It looks like it was a mistake during > > > > > > > > > > > > https://github.com/apache/kafka/pull/6521 > > > > > > > > > > > > Specifically, while addressing code review comments to change a > > bunch > > > of > > > > > > other logs from debugs to warnings, that one seems to have been > > > included by > > > > > > accident: > > > > > > > > > > > https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c > > > > > > > > > > > > I’ll see if I can fix it today. > > > > > > > > > > > > Regarding Bruno's thoughts, there was a pretty old decision to > > > capture the > > > > > > "skipped records" as a metric for visibility and log it at the > > debug > > > level > > > > > > for debuggability. We decided that "warning" wasn't the right level > > > because > > > > > > Streams is operating completely as specified. > > > > > > > > > > > > However, I do agree that it doesn't seem right to see more skipped > > > records > > > > > > during start-up; I would expect to see exactly the same records > > > skipped > > > > > > during start-up as during regular processing, since the skipping > > > logic is > > > > > > completely deterministic and based on the sequence of timestamps > > your > > > > > > records have in the topic. Maybe you just notice it more during > > > startup? > > > > > > I.e., if there are 1000 warning logs spread over a few months, then > > > you > > > > > > don't notice it, but when you see them all together at start-up, > > it's > > > more > > > > > > concerning? > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > > > > > > > On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote: > > > > > > > Hi, > > > > > > > > > > > > > > I am pretty sure this was intentional. All skipped records log > > > > > > > messages are on WARN level. > > > > > > > > > > > > > > If a lot of your records are skipped on app restart with this log > > > > > > > message on WARN-level, they were also skipped with the log > > message > > > on > > > > > > > DEBUG-level. You simply did not know about it before. With an > > > > > > > in-memory window store, this message is logged when a window > > with a > > > > > > > start time older than the current stream time minus the retention > > > > > > > period is put into the window store, i.e., the window is NOT > > > inserted > > > > > > > into the window stroe. If you get a lot of them on app restart, > > you > > > > > > > should have a look at the timestamps of your records and the > > > retention > > > > > > > of your window store. If those values do not explain the > > behavior, > > > > > > > please try to find a minimal example that shows the issue and > > post > > > it > > > > > > > here on the mailing list. > > > > > > > > > > > > > > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří > > <sa...@avast.com.invalid > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134 > > > > > > > > > > > > > > > > log level of "Skipping record for expired segment" was changed > > > from > > > > > > debug > > > > > > > > to warn. Was it intentional change? Should it be somehow > > handled > > > by > > > > > > user? > > > > > > > > How can user handle it? I am getting a lot of these on app > > > restart. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Jiří Samek | Software Developer > > > > > > AVAST Software s.r.o. | Pikrtova 1737/1a | 140 00 Praha 4 > > > > > > M +420 734 524 549 | E sa...@avast.com | W www.avast.com > > > > > > > > -- > > *Jiří Samek * <sa...@avast.com>| *Software Developer* > > *AVAST Software s.r.o.* | Pikrtova 1737/1a | 140 00 Praha 4 > > *M* +420 734 524 549 | *E* sa...@avast.com | *W* www.avast.com