Another question that I have is, is there a way for us detect how many
messages have come out of order? And if possible, what is the delay?

On Thu, May 4, 2017 at 6:17 AM, Mahendra Kariya <mahendra.kar...@go-jek.com>
wrote:

> Hi Matthias,
>
> Sure we will look into this. In the meantime, we have run into another
> issue. We have started getting this error frequently rather frequently and
> the Streams app is unable to recover from this.
>
> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
>
>
> On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> I would recommend to double check the following:
>>
>>  - can you confirm that the filter does not remove all data for those
>> time periods?
>>  - I would also check input for your AggregatorFunction() -- does it
>> receive everything?
>>  - same for .mapValues()
>>
>> This would help to understand in what part of the program the data gets
>> lost.
>>
>>
>> -Matthias
>>
>>
>> On 5/2/17 11:09 PM, Mahendra Kariya wrote:
>> > Hi Garrett,
>> >
>> > Thanks for these insights. But we are not consuming old data. We want
>> the
>> > Streams app to run in near real time. And that is how it is actually
>> > running. The lag never increases beyond a certain limit. So I don't
>> think
>> > that's an issue.
>> >
>> > The values of the configs that you are mentioning are whatever Kafka
>> offers
>> > by default. So I guess that should be fine.
>> >
>> >
>> >
>> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <
>> garrett.bar...@gmail.com>
>> > wrote:
>> >
>> >> Mahendra,
>> >>
>> >>  One possible thing I have seen that exhibits the same behavior of
>> missing
>> >> windows of data is the configuration of the topics (internal and your
>> own)
>> >> retention policies.  I was loading data that was fairly old (weeks) and
>> >> using event time semantics as the record timestamp (custom timestamp
>> >> extractor) and the cleanup stuff was deleting segments nearly right
>> after
>> >> they were written.  In my case default cleanup run was every 5
>> minutes, and
>> >> the default retention was 7 days, so every 5 minutes I lost data.  In
>> my
>> >> logs I saw a ton of warnings about 'offset not found' and kafka
>> skipping
>> >> ahead to whatever the next available offset was.  End result was gaps
>> all
>> >> over my data.  I don't have a good fix yet, I set the retention to
>> >> something massive which I think is getting me other problems.
>> >>
>> >> Maybe that helps?
>> >>
>> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
>> >> mahendra.kar...@go-jek.com>
>> >> wrote:
>> >>
>> >>> Hi Matthias,
>> >>>
>> >>> What we did was read the data from sink topic and print it to console.
>> >> And
>> >>> here's the raw data from that topic (the counts are randomized). As we
>> >> can
>> >>> see, the data is certainly missing for some time windows. For
>> instance,
>> >>> after 1493693760, the next timestamp for which the data is present
>> >>> is 1493694300. That's around 9 minutes of data missing.
>> >>>
>> >>> And this is just one instance. There are a lot of such instances in
>> this
>> >>> file.
>> >>>
>> >>>
>> >>>
>> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
>> >>> mahendra.kar...@go-jek.com> wrote:
>> >>>
>> >>>> Thanks for the update Matthias! And sorry for the delayed response.
>> >>>>
>> >>>> The reason we use .aggregate() is because we want to count the
>> number of
>> >>>> unique values for a particular field in the message. So, we just add
>> >> that
>> >>>> particular field's value in the HashSet and then take the size of the
>> >>>> HashSet.
>> >>>>
>> >>>> On our side, we are also investigating and it looks like there might
>> be
>> >> a
>> >>>> bug somewhere in our codebase. If that's the case, then it's quite
>> >> possible
>> >>>> that there is no bug in Kafka Streams, except the metric one.
>> >>>>
>> >>>> We will revert after confirming.
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
>> >> matth...@confluent.io>
>> >>>> wrote:
>> >>>>
>> >>>>> Just a follow up (we identified a bug in the "skipped records"
>> metric).
>> >>>>> The reported value is not correct.
>> >>>>>
>> >>>>>
>> >>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>> >>>>>> Ok. That makes sense.
>> >>>>>>
>> >>>>>> Question: why do you use .aggregate() instead of .count() ?
>> >>>>>>
>> >>>>>> Also, can you share the code of you AggregatorFunction()? Did you
>> >>>>> change
>> >>>>>> any default setting of StreamsConfig?
>> >>>>>>
>> >>>>>> I have still no idea what could go wrong. Maybe you can run with
>> log
>> >>>>>> level TRACE? Maybe we can get some insight from those.
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>> >>>>>>> Oh good point!
>> >>>>>>>
>> >>>>>>> The reason why there is only one row corresponding to each time
>> >>>>> window is
>> >>>>>>> because it only contains the latest value for the time window. So
>> >>>>> what we
>> >>>>>>> did was we just dumped the data present in the sink topic to a db
>> >>>>> using an
>> >>>>>>> upsert query. The primary key of the table was time window. The
>> file
>> >>>>> that I
>> >>>>>>> attached is actually the data present in the DB. And we know that
>> >>>>> there is
>> >>>>>>> no bug in our db dump code because we have been using it for a
>> long
>> >>>>> time in
>> >>>>>>> production without any issues.
>> >>>>>>>
>> >>>>>>> The reason the count is zero for some time windows is because I
>> >>>>> subtracted
>> >>>>>>> a random number the actual values and rounded it off to zero; for
>> >>>>> privacy
>> >>>>>>> reason. The actual data doesn't have any zero values. I should
>> have
>> >>>>>>> mentioned this earlier. My bad!
>> >>>>>>>
>> >>>>>>> The stream topology code looks something like this.
>> >>>>>>>
>> >>>>>>> stream
>> >>>>>>>     .filter()
>> >>>>>>>     .map((key, value) -> new KeyValue<>(transform(key), value)
>> >>>>>>>     .groupByKey()
>> >>>>>>>     .aggregate(HashSet::new, AggregatorFunction(),
>> >>>>>>> TimeWindows.of(60000).until(3600000))
>> >>>>>>>     .mapValues(HashSet::size)
>> >>>>>>>     .toStream()
>> >>>>>>>     .map((key, value) -> convertToProtobufObject(key, value))
>> >>>>>>>     .to()
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>> >>>>> matth...@confluent.io>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks for the details (sorry that I forgot that you did share
>> the
>> >>>>>>>> output already).
>> >>>>>>>>
>> >>>>>>>> Might be a dumb question, but what is the count for missing
>> windows
>> >>>>> in
>> >>>>>>>> your seconds implementation?
>> >>>>>>>>
>> >>>>>>>> If there is no data for a window, it should not emit a window
>> with
>> >>>>> count
>> >>>>>>>> zero, but nothing.
>> >>>>>>>>
>> >>>>>>>> Thus, looking at your output, I am wondering how it could contain
>> >>>>> line
>> >>>>>>>> like:
>> >>>>>>>>
>> >>>>>>>>> 2017-04-27T04:53:00 0
>> >>>>>>>>
>> >>>>>>>> I am also wondering why your output only contains a single value
>> >> per
>> >>>>>>>> window. As Streams outputs multiple updates per window while the
>> >>>>> count
>> >>>>>>>> is increasing, you should actually see multiple records per
>> window.
>> >>>>>>>>
>> >>>>>>>> Your code is like this:
>> >>>>>>>>
>> >>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>> >>>>>>>>
>> >>>>>>>> Or do you have something more complex?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>> >>>>>>>>>> Can you somehow verify your output?
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams
>> output,
>> >>>>> we do
>> >>>>>>>>> see some missing values. I have attached the Kafka Streams
>> output
>> >>>>> (for a
>> >>>>>>>>> few hours) in the very first email of this thread for reference.
>> >>>>>>>>>
>> >>>>>>>>> Let me also summarise what we have done so far.
>> >>>>>>>>>
>> >>>>>>>>> We took a dump of the raw data present in the source topic. We
>> >>>>> wrote a
>> >>>>>>>>> script to read this data and do the exact same aggregations that
>> >> we
>> >>>>> do
>> >>>>>>>>> using Kafka Streams. And then we compared the output from Kafka
>> >>>>> Streams
>> >>>>>>>> and
>> >>>>>>>>> our script.
>> >>>>>>>>>
>> >>>>>>>>> The difference that we observed in the two outputs is that there
>> >>>>> were a
>> >>>>>>>> few
>> >>>>>>>>> rows (corresponding to some time windows) missing in the Streams
>> >>>>> output.
>> >>>>>>>>> For the time windows for which the data was present, the
>> >> aggregated
>> >>>>>>>> numbers
>> >>>>>>>>> matched exactly.
>> >>>>>>>>>
>> >>>>>>>>> This means, either all the records for a particular time window
>> >> are
>> >>>>> being
>> >>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe
>> >>>>> there is a
>> >>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation,
>> not
>> >>>>> sure
>> >>>>>>>>> though. And there could even be a bug in the reported metric.
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>>
>

Reply via email to