Oh good point!

The reason why there is only one row corresponding to each time window is
because it only contains the latest value for the time window. So what we
did was we just dumped the data present in the sink topic to a db using an
upsert query. The primary key of the table was time window. The file that I
attached is actually the data present in the DB. And we know that there is
no bug in our db dump code because we have been using it for a long time in
production without any issues.

The reason the count is zero for some time windows is because I subtracted
a random number the actual values and rounded it off to zero; for privacy
reason. The actual data doesn't have any zero values. I should have
mentioned this earlier. My bad!

The stream topology code looks something like this.

stream
    .filter()
    .map((key, value) -> new KeyValue<>(transform(key), value)
    .groupByKey()
    .aggregate(HashSet::new, AggregatorFunction(),
TimeWindows.of(60000).until(3600000))
    .mapValues(HashSet::size)
    .toStream()
    .map((key, value) -> convertToProtobufObject(key, value))
    .to()






On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the details (sorry that I forgot that you did share the
> output already).
>
> Might be a dumb question, but what is the count for missing windows in
> your seconds implementation?
>
> If there is no data for a window, it should not emit a window with count
> zero, but nothing.
>
> Thus, looking at your output, I am wondering how it could contain line
> like:
>
> > 2017-04-27T04:53:00 0
>
> I am also wondering why your output only contains a single value per
> window. As Streams outputs multiple updates per window while the count
> is increasing, you should actually see multiple records per window.
>
> Your code is like this:
>
> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>
> Or do you have something more complex?
>
>
> -Matthias
>
>
> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
> >> Can you somehow verify your output?
> >
> >
> > Do you mean the Kafka streams output? In the Kafka Streams output, we do
> > see some missing values. I have attached the Kafka Streams output (for a
> > few hours) in the very first email of this thread for reference.
> >
> > Let me also summarise what we have done so far.
> >
> > We took a dump of the raw data present in the source topic. We wrote a
> > script to read this data and do the exact same aggregations that we do
> > using Kafka Streams. And then we compared the output from Kafka Streams
> and
> > our script.
> >
> > The difference that we observed in the two outputs is that there were a
> few
> > rows (corresponding to some time windows) missing in the Streams output.
> > For the time windows for which the data was present, the aggregated
> numbers
> > matched exactly.
> >
> > This means, either all the records for a particular time window are being
> > skipped, or none. Now this is highly unlikely to happen. Maybe there is a
> > bug somewhere in the rocksdb state stores? Just a speculation, not sure
> > though. And there could even be a bug in the reported metric.
> >
>
>

Reply via email to