[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027841#comment-16027841 ] Ayush Verma commented on KAFKA-4273: I have a use case where I am storing some aggregates for flights against key, origin_destination_departureDate. ( eg. minimum fares, trend, etc.) Now, naturally for flights with expired departure dates (Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010005#comment-16010005 ] Guozhang Wang commented on KAFKA-4273: -- [~vinubarro] I'm wondering if your use case can be handled by Streams' session window aggregations? Details can be found at: https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows In general, you can keep your aggregate results for as long as X time units after there are no more updates on the aggregated keys. > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16002747#comment-16002747 ] Vinoth Rajasekar commented on KAFKA-4273: - we have a use case where we need to do a distinct count on the useragent strings by ip.In a day, on an average we have to do these type of aggregation on 50 million unique ips for many fields. At times, as some ips get more than 5 unique useragents, its tough to fit these records in memory and do a ktable aggregation. For distinct counts the original values needs to be retained in a hashset per record for unique count. To avoid memory issues, I'm making use of the low-level processor API and writing the aggregated output to RocksDB datastore. I want to keeps these record active only for 24hr period and delete/expire them later on from the rocksdb datastore. There is no option now to set custom ttl values to expire the records. This is a nice to have feature. Any idea when this will be available in production? > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688015#comment-15688015 ] ASF GitHub Bot commented on KAFKA-4273: --- GitHub user dpoldrugo opened a pull request: https://github.com/apache/kafka/pull/2159 KAFKA 4273 - Add TTL support for RocksDB Since Streams DSL doesn't support fine grained configurations of state stores (it usese only RocksDB) - I have added new StreamsConfig called `rocksdb.ttl.sec` - which allows you to set TTL for all state stores used by the topology. To make short, if you set property to a value `>=1`, it will use TtlDB instead of RocksDB and this will lead to records getting expired after this defined period. This should help users to bound their disk usage and provide a configuration for use cases where your data has natural TTL/retention. For example, when you process data only for one hour, and after that you don't need the data in state stores anymore. I have added [test](https://github.com/apache/kafka/compare/trunk...dpoldrugo:KAFKA-4273-ttl-support?expand=1#diff-d908a80c770d196ac823752da3b3a864R117) to check if TtlDB is expiring record, but I can't make TtlDB expire record within a reasonable windows (1 minute). Do you have any suggestions how to force TtlDB to expire records more quickly? Since I'm using Kafka and Kafka Streams 0.10.1.0, I have also added this code to the [0.10.1](https://github.com/dpoldrugo/kafka/tree/0.10.1-KAFKA-4273-ttl-support) branch, and if the review goes well I hope it can be added to the 0.10.1.1 release. The patch is here: [KAFKA_4273_Add_TTL_support_for_RocksDB_v2.patch.txt](https://github.com/apache/kafka/files/607638/KAFKA_4273_Add_TTL_support_for_RocksDB_v2.patch.txt) **Suggestion for future work** Since this config/feature applies to all state stores, it would be nice to provide an API for users to configure TTL for every state store, for example during toplogy building with KStreamBuilder. Now: KStreamBuilder#table(String topic, final String storeName) Suggestion: KStreamBuilder#table(String topic, final String storeName, **_StoreOptions_ storeOptions**) Where **_StoreOptions_** would be something like this: `{ ttlSeconds: int }` More details: [KAFKA-4273](https://issues.apache.org/jira/browse/KAFKA-4273) @guozhangwang @dguy @mjsax @norwood @enothereska @ijuma- could you check this out? You can merge this pull request into a Git repository by running: $ git pull https://github.com/dpoldrugo/kafka KAFKA-4273-ttl-support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2159.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2159 commit 5a3f1372daf2a0e939b246756c7e712e9ea21662 Author: dpoldrugoDate: 2016-11-22T21:01:02Z KAFKA 4273 - Add TTL support for RocksDB > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15602159#comment-15602159 ] Davor Poldrugo commented on KAFKA-4273: --- Is this possibly resolved in 0.10.1.0 with streams config: *rocksdb.config.setter* ? http://kafka.apache.org/documentation#streamsconfigs Description of the config: _A Rocks DB config setter class that implements the RocksDBConfigSetter interface_ Interface is here: https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Assignee: Guozhang Wang > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.4#6332)