Another question that I have is, is there a way for us detect how many messages have come out of order? And if possible, what is the delay?
On Thu, May 4, 2017 at 6:17 AM, Mahendra Kariya <mahendra.kar...@go-jek.com> wrote: > Hi Matthias, > > Sure we will look into this. In the meantime, we have run into another > issue. We have started getting this error frequently rather frequently and > the Streams app is unable to recover from this. > > 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 for group group-2. > 07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > Marking the coordinator broker-05:6667 dead for group group-2 > 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 for group group-2. > 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator - > Marking the coordinator broker-05:6667 dead for group group-2 > 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 for group group-2. > 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > Marking the coordinator broker-05:6667 dead for group group-2 > 07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 for group group-2. > > > On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> I would recommend to double check the following: >> >> - can you confirm that the filter does not remove all data for those >> time periods? >> - I would also check input for your AggregatorFunction() -- does it >> receive everything? >> - same for .mapValues() >> >> This would help to understand in what part of the program the data gets >> lost. >> >> >> -Matthias >> >> >> On 5/2/17 11:09 PM, Mahendra Kariya wrote: >> > Hi Garrett, >> > >> > Thanks for these insights. But we are not consuming old data. We want >> the >> > Streams app to run in near real time. And that is how it is actually >> > running. The lag never increases beyond a certain limit. So I don't >> think >> > that's an issue. >> > >> > The values of the configs that you are mentioning are whatever Kafka >> offers >> > by default. So I guess that should be fine. >> > >> > >> > >> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton < >> garrett.bar...@gmail.com> >> > wrote: >> > >> >> Mahendra, >> >> >> >> One possible thing I have seen that exhibits the same behavior of >> missing >> >> windows of data is the configuration of the topics (internal and your >> own) >> >> retention policies. I was loading data that was fairly old (weeks) and >> >> using event time semantics as the record timestamp (custom timestamp >> >> extractor) and the cleanup stuff was deleting segments nearly right >> after >> >> they were written. In my case default cleanup run was every 5 >> minutes, and >> >> the default retention was 7 days, so every 5 minutes I lost data. In >> my >> >> logs I saw a ton of warnings about 'offset not found' and kafka >> skipping >> >> ahead to whatever the next available offset was. End result was gaps >> all >> >> over my data. I don't have a good fix yet, I set the retention to >> >> something massive which I think is getting me other problems. >> >> >> >> Maybe that helps? >> >> >> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya < >> >> mahendra.kar...@go-jek.com> >> >> wrote: >> >> >> >>> Hi Matthias, >> >>> >> >>> What we did was read the data from sink topic and print it to console. >> >> And >> >>> here's the raw data from that topic (the counts are randomized). As we >> >> can >> >>> see, the data is certainly missing for some time windows. For >> instance, >> >>> after 1493693760, the next timestamp for which the data is present >> >>> is 1493694300. That's around 9 minutes of data missing. >> >>> >> >>> And this is just one instance. There are a lot of such instances in >> this >> >>> file. >> >>> >> >>> >> >>> >> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya < >> >>> mahendra.kar...@go-jek.com> wrote: >> >>> >> >>>> Thanks for the update Matthias! And sorry for the delayed response. >> >>>> >> >>>> The reason we use .aggregate() is because we want to count the >> number of >> >>>> unique values for a particular field in the message. So, we just add >> >> that >> >>>> particular field's value in the HashSet and then take the size of the >> >>>> HashSet. >> >>>> >> >>>> On our side, we are also investigating and it looks like there might >> be >> >> a >> >>>> bug somewhere in our codebase. If that's the case, then it's quite >> >> possible >> >>>> that there is no bug in Kafka Streams, except the metric one. >> >>>> >> >>>> We will revert after confirming. >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax < >> >> matth...@confluent.io> >> >>>> wrote: >> >>>> >> >>>>> Just a follow up (we identified a bug in the "skipped records" >> metric). >> >>>>> The reported value is not correct. >> >>>>> >> >>>>> >> >>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote: >> >>>>>> Ok. That makes sense. >> >>>>>> >> >>>>>> Question: why do you use .aggregate() instead of .count() ? >> >>>>>> >> >>>>>> Also, can you share the code of you AggregatorFunction()? Did you >> >>>>> change >> >>>>>> any default setting of StreamsConfig? >> >>>>>> >> >>>>>> I have still no idea what could go wrong. Maybe you can run with >> log >> >>>>>> level TRACE? Maybe we can get some insight from those. >> >>>>>> >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote: >> >>>>>>> Oh good point! >> >>>>>>> >> >>>>>>> The reason why there is only one row corresponding to each time >> >>>>> window is >> >>>>>>> because it only contains the latest value for the time window. So >> >>>>> what we >> >>>>>>> did was we just dumped the data present in the sink topic to a db >> >>>>> using an >> >>>>>>> upsert query. The primary key of the table was time window. The >> file >> >>>>> that I >> >>>>>>> attached is actually the data present in the DB. And we know that >> >>>>> there is >> >>>>>>> no bug in our db dump code because we have been using it for a >> long >> >>>>> time in >> >>>>>>> production without any issues. >> >>>>>>> >> >>>>>>> The reason the count is zero for some time windows is because I >> >>>>> subtracted >> >>>>>>> a random number the actual values and rounded it off to zero; for >> >>>>> privacy >> >>>>>>> reason. The actual data doesn't have any zero values. I should >> have >> >>>>>>> mentioned this earlier. My bad! >> >>>>>>> >> >>>>>>> The stream topology code looks something like this. >> >>>>>>> >> >>>>>>> stream >> >>>>>>> .filter() >> >>>>>>> .map((key, value) -> new KeyValue<>(transform(key), value) >> >>>>>>> .groupByKey() >> >>>>>>> .aggregate(HashSet::new, AggregatorFunction(), >> >>>>>>> TimeWindows.of(60000).until(3600000)) >> >>>>>>> .mapValues(HashSet::size) >> >>>>>>> .toStream() >> >>>>>>> .map((key, value) -> convertToProtobufObject(key, value)) >> >>>>>>> .to() >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax < >> >>>>> matth...@confluent.io> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Thanks for the details (sorry that I forgot that you did share >> the >> >>>>>>>> output already). >> >>>>>>>> >> >>>>>>>> Might be a dumb question, but what is the count for missing >> windows >> >>>>> in >> >>>>>>>> your seconds implementation? >> >>>>>>>> >> >>>>>>>> If there is no data for a window, it should not emit a window >> with >> >>>>> count >> >>>>>>>> zero, but nothing. >> >>>>>>>> >> >>>>>>>> Thus, looking at your output, I am wondering how it could contain >> >>>>> line >> >>>>>>>> like: >> >>>>>>>> >> >>>>>>>>> 2017-04-27T04:53:00 0 >> >>>>>>>> >> >>>>>>>> I am also wondering why your output only contains a single value >> >> per >> >>>>>>>> window. As Streams outputs multiple updates per window while the >> >>>>> count >> >>>>>>>> is increasing, you should actually see multiple records per >> window. >> >>>>>>>> >> >>>>>>>> Your code is like this: >> >>>>>>>> >> >>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to(); >> >>>>>>>> >> >>>>>>>> Or do you have something more complex? >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote: >> >>>>>>>>>> Can you somehow verify your output? >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams >> output, >> >>>>> we do >> >>>>>>>>> see some missing values. I have attached the Kafka Streams >> output >> >>>>> (for a >> >>>>>>>>> few hours) in the very first email of this thread for reference. >> >>>>>>>>> >> >>>>>>>>> Let me also summarise what we have done so far. >> >>>>>>>>> >> >>>>>>>>> We took a dump of the raw data present in the source topic. We >> >>>>> wrote a >> >>>>>>>>> script to read this data and do the exact same aggregations that >> >> we >> >>>>> do >> >>>>>>>>> using Kafka Streams. And then we compared the output from Kafka >> >>>>> Streams >> >>>>>>>> and >> >>>>>>>>> our script. >> >>>>>>>>> >> >>>>>>>>> The difference that we observed in the two outputs is that there >> >>>>> were a >> >>>>>>>> few >> >>>>>>>>> rows (corresponding to some time windows) missing in the Streams >> >>>>> output. >> >>>>>>>>> For the time windows for which the data was present, the >> >> aggregated >> >>>>>>>> numbers >> >>>>>>>>> matched exactly. >> >>>>>>>>> >> >>>>>>>>> This means, either all the records for a particular time window >> >> are >> >>>>> being >> >>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe >> >>>>> there is a >> >>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation, >> not >> >>>>> sure >> >>>>>>>>> though. And there could even be a bug in the reported metric. >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >> >