Hi Matthias,
yes, that's exactly what I was looking for. I wasn't aware of the
possibility to get the starting offset of a partition. My bad, thanks a lot.
Cheers,
Jan
-- Původní zpráva --
Od: Matthias J. Sax
Komu: dev@kafka.apache.org
Datum: 15. 2. 2017
Jan,
If I understand you problem correctly, you do something like this on
startup (I simplify to single partition)
endOffset = consumer.endOffset(...)
while (!done) {
for (ConsumerRecord r : consumer.poll()) {
// do some processing
if (r.offset == endOffset) {
done = true;
Hi Matthias,
I understand that the local cache will not be automatically cleared, but
that is not an issue for me now.
The problem I see is still the same as at the beginning - even caching
data to RocksDB in KafkaStreams implementation might (I would say)
suffer from this issue. When using
Jan,
brokers with version 0.10.1 or higher allow to set both topic cleanup
policies in combination:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
However, this will only delete data in you changelog topic but not in
your RocksDB -- if
Hi Michael,
sorry for my late answer. Configuring the topic as you suggest is one
option (and I will configure it that way), but I wanted to combine the
two data retention mechanisms (if possible). I would like to use log
compaction, so that I will always get at least the last message for
Jan,
> - if I don't send any data to a kafka partition for a period longer then
the data retention interval, then all data from the partition is wiped out
If I interpret your first and second message in this email thread
correctly, then you are talking only about your "state topic" here, i.e.
Hi Matthias,
first of all, thanks for your answer. Sorry if I didn't explain the
problem well, I didn't want to dig too much into detail to focus on the
important and maybe the result was not clear.
My fault, I will try to explain in again. I have two KafkaConsumers in
two separate threads
Jan,
you scenario is quite complex and I am not sure if I understood every
part of it. I try to break it down:
> In my scenario on startup, I want to read all data from a topic (or a subset
> of its partitions),
> wait until all the old data has been cached and then start processing of a
>
Hi all,
I have a question how to do a correct caching in KTable-like structure
on application startup. I'm not sure if this belongs to user or dev
maillist, so sorry if I've chosen the bad one. What is my observation so
far:
- if I don't send any data to a kafka partition for a period