[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-08 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153365#comment-17153365
 ] 

Di Campo edited comment on KAFKA-4273 at 7/8/20, 8:56 AM:
--

Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

To implement some kind of TTL in KV, I suppose you can use punctuation to 
perform store deletion inside the scheduled punctuation. However that would 
make for a potentially long removal at a certain point in time. (BTW, does 
Punctuator execution stop the processing? In that case it would be dangerous to 
wait too long to do it ).

All in all, TTL in KV stores would be a good if not perfect fit for this case :)


was (Author: xmar):
Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

TTL in KV stores would be a good if not perfect fit for this case :)

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:28 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 


def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }




was (Author: xmar):
Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{
def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }
}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could 

[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{
def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }
}}




was (Author: xmar):
Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could 

[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}




was (Author: xmar):

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{  def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". 

[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-06-30 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801
 ] 

Di Campo edited comment on KAFKA-4273 at 6/30/20, 4:12 PM:
---

Any news or improved workaround? I am also interested on this. 
 In my case, I think my preferred workaround should be the one explained 
[here|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]


was (Author: xmar):
Any news or improved workaround? I am also interested on this. 
In my case, I think my preferred workaround should be the one explained 
[[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)