Hi
Does your applicationId change?
Best regards 
Patrik

> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
> 
> Hi everyone! I use kafka-streams, and i have a problem when i use
> windowedBy. Everything works well until I restart the application. After
> restarting my aggregation starts from beginning.
> Code bellow:
>> 
>>    StreamsBuilder builder = new StreamsBuilder()
>>    KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), 
>> Serdes.String()))
>> 
>>    KTable table = 
>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>            .aggregate(
>>            { new AggregatorModel() },
>>            { key, value, aggregate ->
>>                return aggregate.add(value)
>>            }
>>    )
>>            .toStream()
>>            .map({ k, v ->
>>        new KeyValue<>(k.window().end(), v)
>>    })
>>            .to('output')
>> 
>>    def config = new Properties()
>>    config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
>> TimeUnit.SECONDS.toMillis(60))
>> 
>>    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>    kafkaStreams.start()
>> 
>> 
> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> 'latest' and 'earliest' but it didn't help.
> Can you help me understand what I'm doing wrong?
> Thank you.

Reply via email to