Short answer seems to be that my Kafka LogRetentionTime was such that the
metrics I was writing were getting purged from kafka during the test.
Dropped metrics.

On Thu, Jun 15, 2017 at 1:32 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> I have encapsulated the repro into a small self contained project:
> https://github.com/cwelton/kstreams-repro
>
> Thanks,
>   Caleb
>
>
> On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote:
>
>> I do have a TimestampExtractor setup and for the 10 second windows that
>> are emitted all the values expected in those windows are present, e.g. each
>> 10 second window gets 100 values aggregated into it.
>>
>> I have no metrics with null keys or values.
>>
>> I will try to get the entire reproduction case packaged up in a way that
>> I can more easily share.
>>
>>
>> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Another thing to consider? Do you have records will null key or value?
>>> Those records would be dropped and not processes.
>>>
>>> -Matthias
>>>
>>> On 6/15/17 6:24 AM, Garrett Barton wrote:
>>> > Is your time usage correct?  It sounds like you want event time not
>>> > load/process time which is default unless you have a TimestampExtractor
>>> > defined somewhere upstream?  Otherwise I could see far fewer events
>>> coming
>>> > out as streams is just aggregating whatever showed up in that 10 second
>>> > window.
>>> >
>>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
>>> wrote:
>>> >
>>> >> Disabling the cache with:
>>> >>
>>> >> ```
>>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE
>>> RING_CONFIG,
>>> >> 0)
>>> >> ```
>>> >>
>>> >> Results in:
>>> >> - Emitting many more intermediate calculations.
>>> >> - Still losing data.
>>> >>
>>> >> In my test case it output 342476 intermediate calculations for 3414
>>> >> distinct windows, 14400 distinct were expected.
>>> >>
>>> >> Regards,
>>> >>   Caleb
>>> >>
>>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <
>>> matth...@confluent.io>
>>> >> wrote:
>>> >>
>>> >>> This seems to be related to internal KTable caches. You can disable
>>> them
>>> >>> by setting cache size to zero.
>>> >>>
>>> >>> http://docs.confluent.io/current/streams/developer-
>>> >>> guide.html#memory-management
>>> >>>
>>> >>> -Matthias
>>> >>>
>>> >>>
>>> >>>
>>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then
>>> the
>>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>>> >>> THREADS_CONFIG=2`
>>> >>>> or higher the problem shows up.
>>> >>>>
>>> >>>> When the number of threads is 1 the speed of data through the first
>>> >> part
>>> >>> of
>>> >>>> the topology (before the ktable) slows down considerably, but it
>>> seems
>>> >> to
>>> >>>> slow down to the speed of the output which may be the key.
>>> >>>>
>>> >>>> That said... Changing the number of stream threads should not impact
>>> >> data
>>> >>>> correctness.  Seems like a bug someplace in kafka.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>>> >>> wrote:
>>> >>>>
>>> >>>>> I have a topology of
>>> >>>>>     KStream -> KTable -> KStream
>>> >>>>>
>>> >>>>> ```
>>> >>>>>
>>> >>>>> final KStreamBuilder builder = new KStreamBuilder();
>>> >>>>> final KStream<String, Metric> metricStream =
>>> >>> builder.stream(ingestTopic);
>>> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>>> >>>>>         .groupByKey(stringSerde, mySerde)
>>> >>>>>         .reduce(MyThing::merge,
>>> >>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
>>> >>> Duration.ofDays(retentionDays).toMillis()),
>>> >>>>>                 tableTopic);
>>> >>>>>
>>> >>>>> myTable.toStream()
>>> >>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
>>> >>> value.finalize(key.window()))); })
>>> >>>>>         .to(stringSerde, mySerde, sinkTopic);
>>> >>>>>
>>> >>>>> ```
>>> >>>>>
>>> >>>>>
>>> >>>>> Normally went sent data at 10x a second I expect ~1 output metric
>>> for
>>> >>>>> every 100 metrics it receives, based on the 10 second window width.
>>> >>>>>
>>> >>>>> When fed data real time at that rate it seems to do just that.
>>> >>>>>
>>> >>>>> However when I either reprocess on an input topic with a large
>>> amount
>>> >> of
>>> >>>>> data or feed data in significantly faster I see a very different
>>> >>> behavior.
>>> >>>>>
>>> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being
>>> >>> ingested
>>> >>>>> into the ktable, but only 633 emitted from it (expected 14400).
>>> >>>>>
>>> >>>>> Over the next minute the records output creeps to 1796, but then
>>> holds
>>> >>>>> steady and does not keep going up to the expected total of 14400.
>>> >>>>>
>>> >>>>> A consumer reading from the sinkTopic ends up finding about 1264,
>>> >> which
>>> >>> is
>>> >>>>> lower than the 1796 records I would have anticipated from the
>>> number
>>> >> of
>>> >>>>> calls into the final kstream map function.
>>> >>>>>
>>> >>>>> Precise number of emitted records will vary from one run to the
>>> next.
>>> >>>>>
>>> >>>>> Where are the extra metrics going?  Is there some commit issue
>>> that is
>>> >>>>> causing dropped messages if the ktable producer isn't able to keep
>>> up?
>>> >>>>>
>>> >>>>> Any recommendations on where to focus the investigation of the
>>> issue?
>>> >>>>>
>>> >>>>> Running Kafka 0.10.2.1.
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>>   Caleb
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>>
>>
>

Reply via email to