[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2019-04-02 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808070#comment-16808070
 ] 

Sophie Blee-Goldman commented on KAFKA-6579:


[~benjaminedwardwebb] Are you still working on this? If not I would be happy to 
take over (I was just adding yet another independent unit test class for a 
session store and figured it was time to knock this out) 

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Ben Webb
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8184) Update IQ docs to include session stores

2019-04-02 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8184:
--

 Summary: Update IQ docs to include session stores
 Key: KAFKA-8184
 URL: https://issues.apache.org/jira/browse/KAFKA-8184
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


The Interactive Queries docs are out of date, and currently only cover 
queryable key-value and window stores. Session stores can also be queried and 
should be included on this page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8174) Can't call arbitrary SimpleBenchmarks tests from streams_simple_benchmark_test.py

2019-03-28 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8174:
--

 Summary: Can't call arbitrary SimpleBenchmarks tests from 
streams_simple_benchmark_test.py
 Key: KAFKA-8174
 URL: https://issues.apache.org/jira/browse/KAFKA-8174
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


When using the script streams_simple_benchmark_test.py you should be able to 
specify a test name and run that particular method in SimpleBenchmarks. This 
works for most existing benchmarks, however you can't use this to run the 
"yahoo" benchmark and you can't add new tests to SimpleBenchmarks and start 
them successfully. 

 

If you try to run yahoo/new test it fails with the error "Not enough parameters 
are provided; expecting propFileName, testName, numRecords, keySkew, valueSize" 
in main(); the missing argument turns out to be testName.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering

2019-03-28 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8159:
---
Description: 
Currently we assume consistent ordering between serialized and deserialized 
keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes 
will also obey bytesA < bytesB < bytesC. This is not true in general of the 
built-in serdes for signed numerical types (eg Integer, Long). Specifically, it 
is broken by the negative number representations which are lexicographically 
greater than (all) positive number representations. 

 

One consequence of this is that an interactive query of a key range with a 
negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) 
will result in "unexpected behavior" depending on the specific store type.

 

For RocksDB stores with caching disabled, an empty iterator will be returned 
regardless of whether any records do exist in that range. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

  was:
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, an empty iterator will be returned. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.


> Built-in serdes for signed numbers do not obey lexicographical ordering
> ---
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Currently we assume consistent ordering between serialized and deserialized 
> keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes 
> will also obey bytesA < bytesB < bytesC. This is not true in general of the 
> built-in serdes for signed numerical types (eg Integer, Long). Specifically, 
> it is broken by the negative number representations which are 
> lexicographically greater than (all) positive number representations. 
>  
> One consequence of this is that an interactive query of a key range with a 
> negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) 
> will result in "unexpected behavior" depending on the specific store type.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned 
> regardless of whether any records do exist in that range. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering

2019-03-28 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8159:
---
Summary: Built-in serdes for signed numbers do not obey lexicographical 
ordering  (was: Multi-key range queries with negative keyFrom results in 
unexpected behavior)

> Built-in serdes for signed numbers do not obey lexicographical ordering
> ---
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8159) Multi-key range queries with negative keyFrom results in unexpected behavior

2019-03-26 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8159:
--

 Summary: Multi-key range queries with negative keyFrom results in 
unexpected behavior
 Key: KAFKA-8159
 URL: https://issues.apache.org/jira/browse/KAFKA-8159
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, Streams will silently miss and 
negative keys and return those from the range [0, keyTo]. 

 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Multi-key range queries with negative keyFrom results in unexpected behavior

2019-03-26 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8159:
---
Description: 
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, Streams will silently miss and 
negative keys and return those from the range [0, keyTo]. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.

  was:
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, Streams will silently miss and 
negative keys and return those from the range [0, keyTo]. 

 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.


> Multi-key range queries with negative keyFrom results in unexpected behavior
> 
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: streams
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, Streams will silently miss and 
> negative keys and return those from the range [0, keyTo]. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Multi-key range queries with negative keyFrom results in unexpected behavior

2019-03-27 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8159:
---
Description: 
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, an empty iterator will be returned. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.

  was:
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, Streams will silently miss the 
negative keys and return those from the range [0, keyTo]. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.


> Multi-key range queries with negative keyFrom results in unexpected behavior
> 
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2019-04-01 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16807155#comment-16807155
 ] 

Sophie Blee-Goldman commented on KAFKA-8147:


[~mjduijn] Just send an email to 
[d...@kafka.apache.org.|mailto:d...@kafka.apache.org.] The thread should show 
up [here|[https://www.mail-archive.com/dev@kafka.apache.org/index.html]] once 
you do, so you can go back and add the link to your KIP

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Assignee: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e., 
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2019-04-01 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16807155#comment-16807155
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8147 at 4/1/19 8:30 PM:


[~mjduijn] Just send an email to 
[d...@kafka.apache.org.|mailto:d...@kafka.apache.org.] The thread should show 
up here ([https://www.mail-archive.com/dev@kafka.apache.org/index.html]) once 
you do, so you can go back and add the link to your KIP.

 

 


was (Author: ableegoldman):
[~mjduijn] Just send an email to 
[d...@kafka.apache.org.|mailto:d...@kafka.apache.org.] The thread should show 
up [here|[https://www.mail-archive.com/dev@kafka.apache.org/index.html]] once 
you do, so you can go back and add the link to your KIP

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Assignee: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e., 
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8159) Multi-key range queries with negative keyFrom results in unexpected behavior

2019-03-26 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16802286#comment-16802286
 ] 

Sophie Blee-Goldman commented on KAFKA-8159:


Consider also a user may not explicitly want or need to search a negative 
keyspace, or have any negative keys, but could still find their streams app 
crashing from a query of the range  [key - delta, key + delta] if they ever see 
a key < delta

> Multi-key range queries with negative keyFrom results in unexpected behavior
> 
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, Streams will silently miss and 
> negative keys and return those from the range [0, keyTo]. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-03-01 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7918.

Resolution: Fixed

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-03-01 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7912.

Resolution: Fixed

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7963) Extract hard-coded strings to centralized place

2019-02-20 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-7963:
--

 Summary: Extract hard-coded strings to centralized place
 Key: KAFKA-7963
 URL: https://issues.apache.org/jira/browse/KAFKA-7963
 Project: Kafka
  Issue Type: Improvement
Reporter: Sophie Blee-Goldman


Several string literals are hard-coded into the metrics, eg 
"expired-window-record-drop" and "late-record-drop" in the window bytes stores. 
These should be moved to a sensible central location, and widespread string 
literals from these metrics may be causing memory pressure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8007) In-memory window store copies on fetch

2019-02-26 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8007:
--

 Summary: In-memory window store copies on fetch
 Key: KAFKA-8007
 URL: https://issues.apache.org/jira/browse/KAFKA-8007
 Project: Kafka
  Issue Type: Improvement
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman


The current implementation of InMemoryWindowStore copies all record data into a 
new list and returns an iterator over that list when fetching. That is 
inefficient in terms of both memory and time. The fetching logic should be 
moved to the iterators, which could be cleaned up as well (currently they rely 
on ListIterator to provide peekNextKey() functionality, which is unnecessary). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8094) Iterating over cache with get(key) is inefficient

2019-03-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790070#comment-16790070
 ] 

Sophie Blee-Goldman commented on KAFKA-8094:


After some investigation, the solution is not as straightforward as simply 
replacing TreeMap -> ConcurrentSkipListMap and using a subMap iterator. 
Iterators are only weakly consistent, and changes to the underlying map are not 
guaranteed to be reflected in the iterator. Implementing it this way may cause, 
for example, evicted entries to still be returned by the iterator (see 
ThreadCacheTest#shouldSkipEntriesWhereValueHasBeenEvictedFromCache)

> Iterating over cache with get(key) is inefficient 
> --
>
> Key: KAFKA-8094
> URL: https://issues.apache.org/jira/browse/KAFKA-8094
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: streams
>
> Currently, range queries in the caching layer are implemented by creating an 
> iterator over the subset of keys in the range, and calling get() on the 
> underlying TreeMap for each key. While this protects against 
> ConcurrentModificationException, we can improve performance by replacing the 
> TreeMap with a concurrent data structure such as ConcurrentSkipListMap and 
> then just iterating over a subMap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8094) Iterating over cache with get(key) is inefficient

2019-03-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790080#comment-16790080
 ] 

Sophie Blee-Goldman commented on KAFKA-8094:


That said, maybe this is another occasion to question whether a returned 
iterator *should* reflect the current state of the cache/store, or the state at 
the time it was created (ie when it was queried) as a snapshot.

 

Personally I still believe the snapshot is more appropriate, and if it allows 
us to make this improvement am all the more in favor of it (of course this 
might not be a *huge* improvement as it only saves us a factor of log(N) ) . 
WDYT [~guozhang]

> Iterating over cache with get(key) is inefficient 
> --
>
> Key: KAFKA-8094
> URL: https://issues.apache.org/jira/browse/KAFKA-8094
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: streams
>
> Currently, range queries in the caching layer are implemented by creating an 
> iterator over the subset of keys in the range, and calling get() on the 
> underlying TreeMap for each key. While this protects against 
> ConcurrentModificationException, we can improve performance by replacing the 
> TreeMap with a concurrent data structure such as ConcurrentSkipListMap and 
> then just iterating over a subMap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-14 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16793213#comment-16793213
 ] 

Sophie Blee-Goldman commented on KAFKA-7652:


Hi [~jonathanpdx]. I've been looking into the caching layer more deeply and 
discussed with Guozhang, we believe his earlier patch is not an appropriate fix 
so I have opened a PR that should address this more completely.

If you could, please try this out on top of trunk and let me know if it 
helps/how it compares: https://github.com/apache/kafka/pull/6448

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow

2019-03-15 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16793994#comment-16793994
 ] 

Sophie Blee-Goldman commented on KAFKA-8027:


Hi [~prashantideal], I have been looking into this and have two PRs aimed at 
improving performance of segmented stores with caching enabled. Would you be 
able to test either or both of them out, and let me know if they improve things 
at all? You can find the first PR 
[here|[https://github.com/apache/kafka/pull/6433]] and the second one 
[here|[https://github.com/apache/kafka/pull/6448]]

Keep in mind these are just improvements to the caching layer and are unlikely 
to result in overall better performance than withCachingDisabled, since as you 
point out for range queries we must search the underlying RocksDBStore anyway. 
If you don't need caching for other reasons (eg reducing downstream traffic) 
and can afford to turn it off, I recommend doing so. 

> Gradual decline in performance of CachingWindowStore provider when number of 
> keys grow
> --
>
> Key: KAFKA-8027
> URL: https://issues.apache.org/jira/browse/KAFKA-8027
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Prashant
>Priority: Major
>  Labels: interactivequ, kafka-streams
>
> We observed this during a performance test of our stream application which 
> tracks user's activity and provides REST interface to query the window state 
> store.  We used default configuration of Materialized i.e. withCachingEnabled 
> for storing user behaviour stats in a window state store 
> (CompositeWindowStore with CachingWindowStore as underlyin which internally 
> uses RocksDBStore for persistent).  
> While querying window store with store.fetch(key, long, long), it internally 
> tries to fetch the range from ThreadCache which uses a byte iterator to 
> search for a key in cache and on a cache miss it goes to RocksDBStore for 
> result. So, when number of keys in cache becomes large this ThreadCache 
> search starts taking time (range Iterator on all keys) which impacts 
> WindowStore query performance.
>  
> Workaround: If we disable cache with switch on Materialized instance i.e. 
> withCachingDisabled, key search is delegated directly to RocksDBStore which 
> is way faster and completed search in microseconds against millis in case of 
> CachingWindowStore.  
>  
> Stats: With Unique users > 0.5M, random search for a key i.e. UserId:
>  
> withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
> grow)
> withCahingDisabled: t < 1ms (Almost constant time)      



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow

2019-03-15 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16793994#comment-16793994
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8027 at 3/15/19 10:28 PM:
--

Hi [~prashantideal], I have been looking into this and have two PRs aimed at 
improving performance of segmented stores with caching enabled. Would you be 
able to test either or both of them out, and let me know if they improve things 
at all? You can find the first PR 
[here|[https://github.com/apache/kafka/pull/6433]] and the second one 
[here|[https://github.com/apache/kafka/pull/6448]]

Keep in mind these are just improvements to the caching layer and are unlikely 
to result in overall better fetching performance than withCachingDisabled, 
since as you point out for range queries we must search the underlying 
RocksDBStore anyway. If you don't need caching for other reasons (eg reducing 
downstream traffic or writes to RocksDB) and can afford to turn it off, I 
recommend doing so. 


was (Author: ableegoldman):
Hi [~prashantideal], I have been looking into this and have two PRs aimed at 
improving performance of segmented stores with caching enabled. Would you be 
able to test either or both of them out, and let me know if they improve things 
at all? You can find the first PR 
[here|[https://github.com/apache/kafka/pull/6433]] and the second one 
[here|[https://github.com/apache/kafka/pull/6448]]

Keep in mind these are just improvements to the caching layer and are unlikely 
to result in overall better performance than withCachingDisabled, since as you 
point out for range queries we must search the underlying RocksDBStore anyway. 
If you don't need caching for other reasons (eg reducing downstream traffic) 
and can afford to turn it off, I recommend doing so. 

> Gradual decline in performance of CachingWindowStore provider when number of 
> keys grow
> --
>
> Key: KAFKA-8027
> URL: https://issues.apache.org/jira/browse/KAFKA-8027
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Prashant
>Priority: Major
>  Labels: interactivequ, kafka-streams
>
> We observed this during a performance test of our stream application which 
> tracks user's activity and provides REST interface to query the window state 
> store.  We used default configuration of Materialized i.e. withCachingEnabled 
> for storing user behaviour stats in a window state store 
> (CompositeWindowStore with CachingWindowStore as underlyin which internally 
> uses RocksDBStore for persistent).  
> While querying window store with store.fetch(key, long, long), it internally 
> tries to fetch the range from ThreadCache which uses a byte iterator to 
> search for a key in cache and on a cache miss it goes to RocksDBStore for 
> result. So, when number of keys in cache becomes large this ThreadCache 
> search starts taking time (range Iterator on all keys) which impacts 
> WindowStore query performance.
>  
> Workaround: If we disable cache with switch on Materialized instance i.e. 
> withCachingDisabled, key search is delegated directly to RocksDBStore which 
> is way faster and completed search in microseconds against millis in case of 
> CachingWindowStore.  
>  
> Stats: With Unique users > 0.5M, random search for a key i.e. UserId:
>  
> withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
> grow)
> withCahingDisabled: t < 1ms (Almost constant time)      



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8042) Kafka Streams creates many segment stores on state restore

2019-03-21 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798477#comment-16798477
 ] 

Sophie Blee-Goldman commented on KAFKA-8042:


I think KAFKA-7934 would certainly fix this issue, but it seems like an 
independent matter to address why all these segments exist at the same time 
throughout rebalancing (rather than being created and expired one at a time).

 

That said, I was checking out the 2.1 branch and it seems like this shouldn't 
be an issue anymore? During restore we first fast-forward through all records 
to get the max timestamp, and then only create segments that are within a 
retention period of this observed stream time..

> Kafka Streams creates many segment stores on state restore
> --
>
> Key: KAFKA-8042
> URL: https://issues.apache.org/jira/browse/KAFKA-8042
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Adrian McCague
>Priority: Major
> Attachments: StateStoreSegments-StreamsConfig.txt
>
>
> Note that this is from the perspective of one instance of an application, 
> where there are 8 instances total, with partition count 8 for all topics and 
> of course stores. Standby replicas = 1.
> In the process there are multiple instances of {{KafkaStreams}} so the below 
> detail is from one of these.
> h2. Actual Behaviour
> During state restore of an application, many segment stores are created (I am 
> using MANIFEST files as a marker since they preallocate 4MB each). As can be 
> seen this topology has 5 joins - which is the extent of its state.
> {code:java}
> bash-4.2# pwd
> /data/fooapp/0_7
> bash-4.2# for dir in $(find . -maxdepth 1 -type d); do echo "${dir}: $(find 
> ${dir} -type f -name 'MANIFEST-*' -printf x | wc -c)"; done
> .: 8058
> ./KSTREAM-JOINOTHER-25-store: 851
> ./KSTREAM-JOINOTHER-40-store: 819
> ./KSTREAM-JOINTHIS-24-store: 851
> ./KSTREAM-JOINTHIS-29-store: 836
> ./KSTREAM-JOINOTHER-35-store: 819
> ./KSTREAM-JOINOTHER-30-store: 819
> ./KSTREAM-JOINOTHER-45-store: 745
> ./KSTREAM-JOINTHIS-39-store: 819
> ./KSTREAM-JOINTHIS-44-store: 685
> ./KSTREAM-JOINTHIS-34-store: 819
> There are many (x800 as above) of these segment files:
> ./KSTREAM-JOINOTHER-25-store.155146629
> ./KSTREAM-JOINOTHER-25-store.155155902
> ./KSTREAM-JOINOTHER-25-store.155149269
> ./KSTREAM-JOINOTHER-25-store.155154879
> ./KSTREAM-JOINOTHER-25-store.155169861
> ./KSTREAM-JOINOTHER-25-store.155153064
> ./KSTREAM-JOINOTHER-25-store.155148444
> ./KSTREAM-JOINOTHER-25-store.155155671
> ./KSTREAM-JOINOTHER-25-store.155168673
> ./KSTREAM-JOINOTHER-25-store.155159565
> ./KSTREAM-JOINOTHER-25-store.155175735
> ./KSTREAM-JOINOTHER-25-store.155168574
> ./KSTREAM-JOINOTHER-25-store.155163525
> ./KSTREAM-JOINOTHER-25-store.155165241
> ./KSTREAM-JOINOTHER-25-store.155146662
> ./KSTREAM-JOINOTHER-25-store.155178177
> ./KSTREAM-JOINOTHER-25-store.155158740
> ./KSTREAM-JOINOTHER-25-store.155168145
> ./KSTREAM-JOINOTHER-25-store.155166231
> ./KSTREAM-JOINOTHER-25-store.155172171
> ./KSTREAM-JOINOTHER-25-store.155175075
> ./KSTREAM-JOINOTHER-25-store.155163096
> ./KSTREAM-JOINOTHER-25-store.155161512
> ./KSTREAM-JOINOTHER-25-store.155179233
> ./KSTREAM-JOINOTHER-25-store.155146266
> ./KSTREAM-JOINOTHER-25-store.155153691
> ./KSTREAM-JOINOTHER-25-store.155159235
> ./KSTREAM-JOINOTHER-25-store.155152734
> ./KSTREAM-JOINOTHER-25-store.155160687
> ./KSTREAM-JOINOTHER-25-store.155174415
> ./KSTREAM-JOINOTHER-25-store.155150820
> ./KSTREAM-JOINOTHER-25-store.155148642
> ... etc
> {code}
> Once re-balancing and state restoration is complete - the redundant segment 
> files are deleted and the segment count drops to 508 total (where the above 
> mentioned state directory is one of many).
> We have seen the number of these segment stores grow to as many as 15000 over 
> the baseline 508 which can fill smaller volumes. *This means that a state 
> volume that would normally have ~300MB total disk usage would use in excess 
> of 30GB during rebalancing*, mostly preallocated MANIFEST files.
> h2. Expected Behaviour
> For this particular application we expect 508 segment folders total to be 
> active and existing throughout rebalancing. Give or take migrated tasks that 
> are subject to the {{state.cleanup.delay.ms}}.
> h2. Preliminary investigation
> * This 

[jira] [Created] (KAFKA-7929) RocksDB Window Store allows segments to expire out from under iterator

2019-02-13 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-7929:
--

 Summary: RocksDB Window Store allows segments to expire out from 
under iterator
 Key: KAFKA-7929
 URL: https://issues.apache.org/jira/browse/KAFKA-7929
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


While we provide no guarantees about returning a snapshot when fetching from 
persistent window stores, we should at least not allow old segments to expire 
while an iterator over them remains open. This can result in unexpected 
behavior as the number of records returned depends on how quickly the results 
are read from an iterator, and you might even end up reading records with a gap 
in the middle.

 

For example, you might fetch records between t1 and t3, then immediately read 
the first record (t1) and do some processing. If enough time advances by the 
time you read the second record from the iterator, record t2 may have expired, 
so the next you would read is t3. Therefore you conclude there were records at 
t1 and t3 but nothing at t2, which is incorrect. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767663#comment-16767663
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


Makes sense, I will consider the caching layer to be outside of the immediate 
scope for now. Does it make sense to break up the PR into one for each type of 
store (ie kv, window, session)? The implementation changes will be rather minor 
but the testing framework requires some overhaul since  is 
hardcoded into all the unit tests... (still relatively minor, but certainly 
many many lines of code)

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767611#comment-16767611
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


In light of the decision to close KAFKA-7917 should we include the three 
CachingXXStore layers in this as well?

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-7918:
--

Assignee: Sophie Blee-Goldman

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765587#comment-16765587
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


For the LRUStore, there are two classes this should be applied to:

5) MemoryLRUCache

6) MemoryNavigableLRUCache

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-7918:
---
Comment: was deleted

(was: The RocksDB key-value store is actually already implemented as a  store, so this applies to the following stores:

 
 * RocksDBWindowStore
 * RocksDBSessionStore
 * MemoryLRUCache
 * MemoryNavigableLRUCache
 * InMemoryKeyValueStore
 * (upcoming) InMemoryWindowStore)

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765585#comment-16765585
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


The RocksDB key-value store is actually already implemented as a  store, so this applies to the following stores:

 
 * RocksDBWindowStore
 * RocksDBSessionStore
 * MemoryLRUCache
 * MemoryNavigableLRUCache
 * InMemoryKeyValueStore
 * (upcoming) InMemoryWindowStore

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765587#comment-16765587
 ] 

Sophie Blee-Goldman edited comment on KAFKA-7918 at 2/12/19 1:43 AM:
-

For the LRUStore, there are two classes this should be applied to:

4)
 # MemoryLRUCache
 # MemoryNavigableLRUCache


was (Author: ableegoldman):
For the LRUStore, there are two classes this should be applied to:

5) MemoryLRUCache

6) MemoryNavigableLRUCache

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765593#comment-16765593
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


A number of tests will need to be reworked, but I don't see any public APIs 
standing in the way 

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7934) Optimize restore for windowed and session stores

2019-02-19 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16772534#comment-16772534
 ] 

Sophie Blee-Goldman commented on KAFKA-7934:


Is there some reason we can't just work backwards from the last message using a 
putIfAbsent method (would need to be implemented for these stores, I believe..) 
That would definitely minimize the number of expired records we insert then 
delete

> Optimize restore for windowed and session stores
> 
>
> Key: KAFKA-7934
> URL: https://issues.apache.org/jira/browse/KAFKA-7934
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> During state restore of window/session stores, the changelog topic is scanned 
> from the oldest entries to the newest entry. This happen on a 
> record-per-record basis or in record batches.
> During this process, new segments are created while time advances (base on 
> the record timestamp of the record that are restored). However, depending on 
> the retention time, we might expire segments during restore process later 
> again. This is wasteful. Because retention time is based on the largest 
> timestamp per partition, it is possible to compute a bound for live and 
> expired segment upfront (assuming that we know the largest timestamp). This 
> way, during restore, we could avoid creating segments that are expired later 
> anyway and skip over all corresponding records.
> The problem is, that we don't know the largest timestamp per partition. Maybe 
> the broker timestamp index could help to provide an approximation for this 
> value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-02-08 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-7912:
--

 Summary: In-memory key-value store does not support concurrent 
access 
 Key: KAFKA-7912
 URL: https://issues.apache.org/jira/browse/KAFKA-7912
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman


Currently, the in-memory key-value store uses a Map to store key-value pairs 
and fetches them by calling subMap and returning an iterator to this submap. 
This is unsafe as the submap is just a view of the original map and there is 
risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8215) Limit memory usage of RocksDB

2019-04-10 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8215:
--

 Summary: Limit memory usage of RocksDB
 Key: KAFKA-8215
 URL: https://issues.apache.org/jira/browse/KAFKA-8215
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


The memory usage of Streams is currently unbounded in part because of RocksDB, 
which consumes memory on a per-instance basis. Each instance (ie each 
persistent state store) will have its own write buffer, index blocks, and block 
cache. The size of these can be configured individually, but there is currently 
no way for a Streams app to limit the total memory available across instances. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2019-04-10 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-6579:
--

Assignee: Sophie Blee-Goldman  (was: Ben Webb)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5636) Add Sliding-Window support for Aggregations

2019-04-14 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-5636:
---
Description: 
We support three windowing types for aggregations in the DSL right now:
 * Tumbling windows
 * Hopping windows (note: some stream processing tools call these "sliding 
windows")
 * Session windows

Some users have expressed the need for sliding windows. While we do use sliding 
windows for joins, we do not yet support sliding window aggregations in the DSL

  was:
We support three windowing types in the DSL right now:

* Tumbling windows
* Hopping windows (note: some stream processing tools call these "sliding 
windows")
* Session windows

Some users have expressed the need for sliding windows.  We already support 
sliding windows because they are used for joins, but we don't expose sliding 
windows directly through the API/DSL.

Summary: Add Sliding-Window support for Aggregations  (was: DSL: allow 
sliding windows to be used directly (i.e. not just implicitly when doing joins))

> Add Sliding-Window support for Aggregations
> ---
>
> Key: KAFKA-5636
> URL: https://issues.apache.org/jira/browse/KAFKA-5636
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Michael Noll
>Priority: Minor
>
> We support three windowing types for aggregations in the DSL right now:
>  * Tumbling windows
>  * Hopping windows (note: some stream processing tools call these "sliding 
> windows")
>  * Session windows
> Some users have expressed the need for sliding windows. While we do use 
> sliding windows for joins, we do not yet support sliding window aggregations 
> in the DSL



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8094) Iterating over cache with get(key) is inefficient

2019-03-11 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8094:
--

 Summary: Iterating over cache with get(key) is inefficient 
 Key: KAFKA-8094
 URL: https://issues.apache.org/jira/browse/KAFKA-8094
 Project: Kafka
  Issue Type: Improvement
Reporter: Sophie Blee-Goldman


Currently, range queries in the caching layer are implemented by creating an 
iterator over the subset of keys in the range, and calling get() on the 
underlying TreeMap for each key. While this protects against 
ConcurrentModificationException, we can improve performance by replacing the 
TreeMap with a concurrent data structure such as ConcurrentSkipListMap and then 
just iterating over a subMap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-15 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840588#comment-16840588
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


Actually, even if you are not using the ConfigSetter the BloomFilter object was 
also being leaked. There's a patch for this that should be available in 2.2.1

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-15 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840590#comment-16840590
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


Sorry, commented before I read the ticket. As John mentioned the leak is 
probably due to user constructed objects in ConfigSetter, I tried to check 
carefully for any other leaked objects after finding the BloomFilter although 
it's unclear why upgrading should have introduced a leak unless you always 
reconfigured your Options?

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-15 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8367:
---
Comment: was deleted

(was: Actually, even if you are not using the ConfigSetter the BloomFilter 
object was also being leaked. There's a patch for this that should be available 
in 2.2.1)

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-16 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841748#comment-16841748
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8367 at 5/16/19 9:46 PM:
-

Hm. I notice in going from 2.0 to 2.1 we upgraded Rocks from 5.7.3 to 5.15.10 
and would like to rule out the possibility that the leak is in Rocks itself. If 
we can test the older version of Rocks with the newer version of Streams that 
should help us isolate the problem. I opened a quick branch off 2.2 with Rocks 
downgraded to v5.7 – can you build from  this 
[https://github.com/ableegoldman/kafka/tree/testRocksDBleak]  and see if the 
leak is still present? 

Did your RocksDBConfigSetter before 2.2.0 use/set the same configs or did any 
of those change? I agree your ConfigSetter shouldn't be leaking just trying to 
get all the details. It might also be worth investigating whether the leak is 
present  since 2.1 or just 2.2


was (Author: ableegoldman):
Hm. I notice in going from 2.0 to 2.1 we upgraded Rocks from 5.7.3 to 5.15.10 
and would like to rule out the possibility that the leak is in Rocks itself. If 
we can test the older version of Rocks with the newer version of Streams that 
should help us isolate the problem. I opened a quick branch off 2.2 with Rocks 
downgraded to v5.7 – can you build from 
[this|[https://github.com/ableegoldman/kafka/tree/testRocksDBleak]] and see if 
the leak is still present? 

Did your RocksDBConfigSetter before 2.2.0 use/set the same configs or did any 
of those change? I agree your ConfigSetter shouldn't be leaking just trying to 
get all the details. It might also be worth investigating whether the leak is 
present  since 2.1 or just 2.2

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-16 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841748#comment-16841748
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


Hm. I notice in going from 2.0 to 2.1 we upgraded Rocks from 5.7.3 to 5.15.10 
and would like to rule out the possibility that the leak is in Rocks itself. If 
we can test the older version of Rocks with the newer version of Streams that 
should help us isolate the problem. I opened a quick branch off 2.2 with Rocks 
downgraded to v5.7 – can you build from 
[this|[https://github.com/ableegoldman/kafka/tree/testRocksDBleak]] and see if 
the leak is still present? 

Did your RocksDBConfigSetter before 2.2.0 use/set the same configs or did any 
of those change? I agree your ConfigSetter shouldn't be leaking just trying to 
get all the details. It might also be worth investigating whether the leak is 
present  since 2.1 or just 2.2

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8372) Remove deprecated RocksDB#compactRange API

2019-05-15 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8372:
--

 Summary: Remove deprecated RocksDB#compactRange API
 Key: KAFKA-8372
 URL: https://issues.apache.org/jira/browse/KAFKA-8372
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.0.0
Reporter: Sophie Blee-Goldman


In upgrading Rocks from v5.15 to v5.18, several of the RocksDB#compactRange() 
methods were deprecated in favor of variations leveraging the new 
CompactRangeOptions object. However v5.18 left a gap in the API with no 
signature allowing you to pass in an options object without also passing it a 
start, end byte[] specifying a range to be compacted. Since we would like to 
compact the entire thing and not a subrange, while needing to specify several 
options, the deprecation warning was suppressed for now.

In v6.0 this gap is closed as you can pass null in for start, end to specify 
that the entire range should be compacted.

When upgrading Rocks to v6.0 or later, the deprecation suppressions should be 
removed from DualColumnFamilyAccessor#toggleDBForBulkLoading() 
(RocksDBTimestampedStore.java) and and 
SingleColumnFamilyAccessor#toggleDBForBulkLoading() (RocksDBStore.java) and the 
following replacement should be made:

 

db.compactRange(columnFamily, true, 1, 0)

-->

db.compactRange(columnFamily, null, null, CompactRangeOptions);

 

NOTE: CompactRangeOptions extend RocksObject and as such should be closed to 
avoid leaking memory!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8347:
---
Affects Version/s: (was: 2.1.1)
   2.1.0

> Choose next record to process by timestamp
> --
>
> Key: KAFKA-8347
> URL: https://issues.apache.org/jira/browse/KAFKA-8347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently PartitionGroup will determine the next record to process by 
> choosing the partition with the lowest stream time. However if a partition 
> contains out of order data its stream time may be significantly larger than 
> the timestamp of the next record. The next record should instead be chosen as 
> the record with the lowest timestamp across all partitions, regardless of 
> which partition it comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8347:
---
Affects Version/s: (was: 2.3.0)
   2.2.0
   2.1.1

> Choose next record to process by timestamp
> --
>
> Key: KAFKA-8347
> URL: https://issues.apache.org/jira/browse/KAFKA-8347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently PartitionGroup will determine the next record to process by 
> choosing the partition with the lowest stream time. However if a partition 
> contains out of order data its stream time may be significantly larger than 
> the timestamp of the next record. The next record should instead be chosen as 
> the record with the lowest timestamp across all partitions, regardless of 
> which partition it comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8347:
---
Fix Version/s: 2.2.1
   2.1.2

> Choose next record to process by timestamp
> --
>
> Key: KAFKA-8347
> URL: https://issues.apache.org/jira/browse/KAFKA-8347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Currently PartitionGroup will determine the next record to process by 
> choosing the partition with the lowest stream time. However if a partition 
> contains out of order data its stream time may be significantly larger than 
> the timestamp of the next record. The next record should instead be chosen as 
> the record with the lowest timestamp across all partitions, regardless of 
> which partition it comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8215) Limit memory usage of RocksDB

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8215:
---
Fix Version/s: 2.3.0

> Limit memory usage of RocksDB
> -
>
> Key: KAFKA-8215
> URL: https://issues.apache.org/jira/browse/KAFKA-8215
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> The memory usage of Streams is currently unbounded in part because of 
> RocksDB, which consumes memory on a per-instance basis. Each instance (ie 
> each persistent state store) will have its own write buffer, index blocks, 
> and block cache. The size of these can be configured individually, but there 
> is currently no way for a Streams app to limit the total memory available 
> across instances. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8215) Limit memory usage of RocksDB

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8215.

Resolution: Fixed

> Limit memory usage of RocksDB
> -
>
> Key: KAFKA-8215
> URL: https://issues.apache.org/jira/browse/KAFKA-8215
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> The memory usage of Streams is currently unbounded in part because of 
> RocksDB, which consumes memory on a per-instance basis. Each instance (ie 
> each persistent state store) will have its own write buffer, index blocks, 
> and block cache. The size of these can be configured individually, but there 
> is currently no way for a Streams app to limit the total memory available 
> across instances. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8478) Poll for more records before forced processing

2019-06-04 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855957#comment-16855957
 ] 

Sophie Blee-Goldman commented on KAFKA-8478:


Also related to KAFKA-7458

> Poll for more records before forced processing
> --
>
> Key: KAFKA-8478
> URL: https://issues.apache.org/jira/browse/KAFKA-8478
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> While analyzing the algorithm of Streams's poll/process loop, I noticed the 
> following:
> The algorithm of runOnce is:
> {code}
> loop0:
>   long poll for records (100ms)
>   loop1:
> loop2: for BATCH_SIZE iterations:
>   process one record in each task that has data enqueued
> adjust BATCH_SIZE
> if loop2 processed any records, repeat loop 1
> else, break loop1 and repeat loop0
> {code}
> There's potentially an unwanted interaction between "keep processing as long 
> as any record is processed" and forcing processing after `max.task.idle.ms`.
> If there are two tasks, A and B, and A runs out of records on one input 
> before B, then B could keep the processing loop running, and hence prevent A 
> from getting any new records, until max.task.idle.ms expires, at which point 
> A will force processing on its other input partition. The intent of idling is 
> to at least give A a chance of getting more records on the empty input, but 
> under this situation, we'd never even check for more records before forcing 
> processing.
> I'm thinking we should only enforce processing if there was a completed poll 
> since we noticed the task was missing inputs (otherwise, we may as well not 
> bother idling at all).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8494) Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor (part 4)

2019-06-14 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8494:
---
Summary: Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor 
(part 4)  (was: Refactor Consumer#StickyAssignor to support incremental 
protocol (part 4))

> Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor (part 4)
> ---
>
> Key: KAFKA-8494
> URL: https://issues.apache.org/jira/browse/KAFKA-8494
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8491) Bump up Consumer Protocol to v2 (part 1)

2019-06-18 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8491.

Resolution: Fixed

> Bump up Consumer Protocol to v2 (part 1)
> 
>
> Key: KAFKA-8491
> URL: https://issues.apache.org/jira/browse/KAFKA-8491
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-06-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-4600:
---
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-8179

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8179) Incremental Rebalance Protocol for Kafka Consumer

2019-06-13 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863553#comment-16863553
 ] 

Sophie Blee-Goldman commented on KAFKA-8179:


KIP-429 will piggy-back a fix for this issue

> Incremental Rebalance Protocol for Kafka Consumer
> -
>
> Key: KAFKA-8179
> URL: https://issues.apache.org/jira/browse/KAFKA-8179
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Recently Kafka community is promoting cooperative rebalancing to mitigate the 
> pain points in the stop-the-world rebalancing protocol. This ticket is 
> created to initiate that idea at the Kafka consumer client, which will be 
> beneficial for heavy-stateful consumers such as Kafka Streams applications.
> In short, the scope of this ticket includes reducing unnecessary rebalance 
> latency due to heavy partition migration: i.e. partitions being revoked and 
> re-assigned. This would make the built-in consumer assignors (range, 
> round-robin etc) to be aware of previously assigned partitions and be sticky 
> in best-effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8510) Update StreamsPartitionAssignor to use the built-in owned partitions to achieve stickiness (part 7)

2019-06-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8510:
--

Assignee: Sophie Blee-Goldman

> Update StreamsPartitionAssignor to use the built-in owned partitions to 
> achieve stickiness (part 7)
> ---
>
> Key: KAFKA-8510
> URL: https://issues.apache.org/jira/browse/KAFKA-8510
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Today this information is encoded as part of the user data bytes, we can now 
> remove it and leverage on the owned partitions of the protocol directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8494) Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor (part 4)

2019-06-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8494:
--

Assignee: Sophie Blee-Goldman

> Refactor Consumer#StickyAssignor and add CooperativeStickyAssignor (part 4)
> ---
>
> Key: KAFKA-8494
> URL: https://issues.apache.org/jira/browse/KAFKA-8494
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8575) Investigate cleaning up task suspension (part 7)

2019-06-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8575:
---
Summary: Investigate cleaning up task suspension (part 7)  (was: 
Investigate cleaning up task suspension)

> Investigate cleaning up task suspension (part 7)
> 
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8575) Investigate cleaning up task suspension

2019-06-20 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8575:
--

 Summary: Investigate cleaning up task suspension
 Key: KAFKA-8575
 URL: https://issues.apache.org/jira/browse/KAFKA-8575
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sophie Blee-Goldman


With KIP-429 the suspend/resume of tasks may have minimal gains while adding a 
lot of complexity and potential bugs. We should consider removing/cleaning it 
up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8575) Investigate cleaning up task suspension (part 8)

2019-06-21 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8575:
---
Summary: Investigate cleaning up task suspension (part 8)  (was: 
Investigate cleaning up task suspension (part 7))

> Investigate cleaning up task suspension (part 8)
> 
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8347) Choose next record to process by timestamp

2019-05-09 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8347:
--

 Summary: Choose next record to process by timestamp
 Key: KAFKA-8347
 URL: https://issues.apache.org/jira/browse/KAFKA-8347
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


Currently PartitionGroup will determine the next record to process by choosing 
the partition with the lowest stream time. However if a partition contains out 
of order data its stream time may be significantly larger than the timestamp of 
the next record. The next record should instead be chosen as the record with 
the lowest timestamp across all partitions, regardless of which partition it 
comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836606#comment-16836606
 ] 

Sophie Blee-Goldman commented on KAFKA-8315:


[~the4thamigo_uk] The code you're looking for describing the logic of choosing 
the next record to process is in 
org.apache.kafka.streams.processor.internals.PartitionGroup, which contains a 
priority queue "nonEmptyQueuesByTime" that serves up the partition with the 
lowest timestamp when polled (unless max.idle.ms has passed as checked in 
StreamTask#isProcessable)

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837765#comment-16837765
 ] 

Sophie Blee-Goldman commented on KAFKA-8315:


[~the4thamigo_uk] After I directed you to check out RecordQueue I went back to 
look over it and filed the ticket John linked: you're right, it actually was 
going by partition time rather than by the timestamp of the head record. I've 
opened a simple PR with the fix 
[here|[https://github.com/apache/kafka/pull/6719]]

That said, I'm not sure this actually affects your use case. If all the data is 
in order, partition time should be the same as the head record's timestamp, so 
this should only come into play when processing out of order data. In your 
example above, partition A would have streamtime = 1 when first choosing the 
next record, as it will not have seen the record with timestamp 4 yet.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-21 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845381#comment-16845381
 ] 

Sophie Blee-Goldman commented on KAFKA-7994:


Hi [~Yohan123], I think we need to be careful in assuming a singular view of 
streamtime across tasks or even within a single task. Rather than it being an 
obstacle that different subtopologies can't "talk" to one another and pass 
along a single stream time, I think this actually enforces correctness – each 
node has its own sense of time and it doesn't make sense for them to look 
upstream for the time as seen by a different node. See 
[https://github.com/apache/kafka/pull/6278#|https://github.com/apache/kafka/pull/6278]

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-23 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16847033#comment-16847033
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


How many state stores are present in your topology? How many partitions? 

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842778#comment-16842778
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


Bummer. Thanks for helping with the investigation – it'll be helpful to know if 
this affects 2.1 as well.

Was your 2.0.1 app basically the same as your 2.2 app? No chance of a new 
iterator that isn't being closed causing this?

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842778#comment-16842778
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8367 at 5/17/19 11:11 PM:
--

Bummer. Thanks for helping with the investigation – it'll be helpful to know if 
this affects 2.1 as well.

Was your 2.0.1 app basically the same as your 2.2 app? Actually, would you be 
able to share your code? (No chance this is just an unclosed iterator pinning 
resources I assume)


was (Author: ableegoldman):
Bummer. Thanks for helping with the investigation – it'll be helpful to know if 
this affects 2.1 as well.

Was your 2.0.1 app basically the same as your 2.2 app? No chance of a new 
iterator that isn't being closed causing this?

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8324) User constructed RocksObjects leak memory

2019-05-17 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8324:
---
Affects Version/s: 0.10.1.0

> User constructed RocksObjects leak memory
> -
>
> Key: KAFKA-8324
> URL: https://issues.apache.org/jira/browse/KAFKA-8324
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
> Fix For: 2.3.0
>
>
> Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
> take Rocks objects as parameters. Many of these – including potentially large 
> objects like Cache and Filter – inherit from AbstractNativeReference and must 
> be closed explicitly in order to free the memory of the backing C++ object. 
> However the user has no way of closing any objects they have created in 
> RocksDBConfigSetter, and we do not ever close them for them. 
> KIP-453: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8347) Choose next record to process by timestamp

2019-05-20 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844233#comment-16844233
 ] 

Sophie Blee-Goldman commented on KAFKA-8347:


Oh I missed the RC1 email but you're right, will fix

> Choose next record to process by timestamp
> --
>
> Key: KAFKA-8347
> URL: https://issues.apache.org/jira/browse/KAFKA-8347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Currently PartitionGroup will determine the next record to process by 
> choosing the partition with the lowest stream time. However if a partition 
> contains out of order data its stream time may be significantly larger than 
> the timestamp of the next record. The next record should instead be chosen as 
> the record with the lowest timestamp across all partitions, regardless of 
> which partition it comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp

2019-05-20 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8347:
---
Fix Version/s: (was: 2.2.1)
   2.2.2

> Choose next record to process by timestamp
> --
>
> Key: KAFKA-8347
> URL: https://issues.apache.org/jira/browse/KAFKA-8347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.2
>
>
> Currently PartitionGroup will determine the next record to process by 
> choosing the partition with the lowest stream time. However if a partition 
> contains out of order data its stream time may be significantly larger than 
> the timestamp of the next record. The next record should instead be chosen as 
> the record with the lowest timestamp across all partitions, regardless of 
> which partition it comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8324) User constructed RocksObjects leak memory

2019-05-03 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8324:
---
Description: 
Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
take Rocks objects as parameters. Many of these – including potentially large 
objects like Cache and Filter – inherit from AbstractNativeReference and must 
be closed explicitly in order to free the memory of the backing C++ object. 
However the user has no way of closing any objects they have created in 
RocksDBConfigSetter, and we do not ever close them for them. 

 

  was:
Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
take Rocks objects as parameters. Many of these--including potentially large 
objects like Cache and Filter-- inherit from AbstractNativeReference and must 
be closed explicitly in order to free the memory of the backing C++ object. 
However the user has no way of closing any objects they have created in 
RocksDBConfigSetter, and we do not ever close them for them. 

 


> User constructed RocksObjects leak memory
> -
>
> Key: KAFKA-8324
> URL: https://issues.apache.org/jira/browse/KAFKA-8324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
> take Rocks objects as parameters. Many of these – including potentially large 
> objects like Cache and Filter – inherit from AbstractNativeReference and must 
> be closed explicitly in order to free the memory of the backing C++ object. 
> However the user has no way of closing any objects they have created in 
> RocksDBConfigSetter, and we do not ever close them for them. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8323) Memory leak of BloomFilter Rocks object

2019-05-03 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8323:
--

 Summary: Memory leak of BloomFilter Rocks object
 Key: KAFKA-8323
 URL: https://issues.apache.org/jira/browse/KAFKA-8323
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0, 2.2.1
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman


Any RocksJava object that inherits from org.rocksdb.AbstractNativeReference 
must be closed explicitly in order to free up the memory of the backing C++ 
object. The BloomFilter extends RocksObject (which implements 
AbstractNativeReference) and should be also be closed in RocksDBStore#close to 
avoid leaking memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8324) User constructed RocksObjects leak memory

2019-05-03 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8324:
--

 Summary: User constructed RocksObjects leak memory
 Key: KAFKA-8324
 URL: https://issues.apache.org/jira/browse/KAFKA-8324
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
take Rocks objects as parameters. Many of these--including potentially large 
objects like Cache and Filter-- inherit from AbstractNativeReference and must 
be closed explicitly in order to free the memory of the backing C++ object. 
However the user has no way of closing any objects they have created in 
RocksDBConfigSetter, and we do not ever close them for them. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8323) Memory leak of BloomFilter Rocks object

2019-05-03 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8323:
---
Affects Version/s: (was: 2.2.1)
   (was: 2.3.0)
   2.2.0
Fix Version/s: 2.2.1
   2.3.0

> Memory leak of BloomFilter Rocks object
> ---
>
> Key: KAFKA-8323
> URL: https://issues.apache.org/jira/browse/KAFKA-8323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.3.0, 2.2.1
>
>
> Any RocksJava object that inherits from org.rocksdb.AbstractNativeReference 
> must be closed explicitly in order to free up the memory of the backing C++ 
> object. The BloomFilter extends RocksObject (which implements 
> AbstractNativeReference) and should be also be closed in RocksDBStore#close 
> to avoid leaking memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8295) Optimize count() using RocksDB merge operator

2019-04-26 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8295:
--

 Summary: Optimize count() using RocksDB merge operator
 Key: KAFKA-8295
 URL: https://issues.apache.org/jira/browse/KAFKA-8295
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


In addition to regular put/get/delete RocksDB provides a fourth operation, 
merge. This essentially provides an optimized read/update/write path in a 
single operation. One of the built-in (C++) merge operators exposed over the 
Java API is a counter. We should be able to leverage this for a more efficient 
implementation of count()

 

(Note: Unfortunately it seems unlikely we can use this to optimize general 
aggregations, even if RocksJava allowed for a custom merge operator, unless we 
provide a way for the user to specify and connect a C++ implemented aggregator 
– otherwise we incur too much cost crossing the jni for a net performance 
benefit)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8609) Add consumer metrics for rebalances (part 9)

2019-06-27 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8609:
--

Assignee: Sophie Blee-Goldman

> Add consumer metrics for rebalances (part 9)
> 
>
> Key: KAFKA-8609
> URL: https://issues.apache.org/jira/browse/KAFKA-8609
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> We would like to track some additional metrics on the consumer side related 
> to rebalancing as part of this KIP, including
>  
> 1) total rebalance latency (latency from start to completion of rebalance)
> 2) per-callback latency (time spent in onPartitionsRevoked, 
> onPartitionsAssigned, onPartitionsLost)
> 3) join/sync group latency (response_received_time - request_sent_time)
> 4) rebalance rate (# rebalances/day)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8609) Add consumer metrics for rebalances

2019-06-27 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8609:
--

 Summary: Add consumer metrics for rebalances
 Key: KAFKA-8609
 URL: https://issues.apache.org/jira/browse/KAFKA-8609
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sophie Blee-Goldman


We would like to track some additional metrics on the consumer side related to 
rebalancing as part of this KIP, including

 

1) total rebalance latency (latency from start to completion of rebalance)

2) per-callback latency (time spent in onPartitionsRevoked, 
onPartitionsAssigned, onPartitionsLost)

3) join/sync group latency (response_received_time - request_sent_time)

4) rebalance rate (# rebalances/day)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8609) Add consumer metrics for rebalances (part 9)

2019-06-27 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8609:
---
Summary: Add consumer metrics for rebalances (part 9)  (was: Add consumer 
metrics for rebalances)

> Add consumer metrics for rebalances (part 9)
> 
>
> Key: KAFKA-8609
> URL: https://issues.apache.org/jira/browse/KAFKA-8609
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> We would like to track some additional metrics on the consumer side related 
> to rebalancing as part of this KIP, including
>  
> 1) total rebalance latency (latency from start to completion of rebalance)
> 2) per-callback latency (time spent in onPartitionsRevoked, 
> onPartitionsAssigned, onPartitionsLost)
> 3) join/sync group latency (response_received_time - request_sent_time)
> 4) rebalance rate (# rebalances/day)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-07-10 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880745#comment-16880745
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8367 at 7/10/19 4:26 PM:
-

Hey [~pavelsavov]! Sorry for the long silence. I believe we've tracked down 
another memory leak, which if you're able to test it out can be found here: 
[https://github.com/apache/kafka/pull/7049]

This issue seems like it actually does affect 2.1 in addition to 2.2, but the 
patch was confirmed to fix the leak someone else was experiencing using a 
similar setup to yours on 2.2 so please give it a try.


was (Author: ableegoldman):
Hey [~pavelsavov]! Sorry for the long silence. I believe we've tracked down 
another memory leak, which if you're able to test it out can be found here: 
[https://github.com/apache/kafka/pull/7049]

It may not be the only leak, as it affects 2.1 as well as 2.2, but we believe 
it's still an important fix. If this does not help we'll keep digging

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-10 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8637:
---
Affects Version/s: 2.2.0
   2.3.0
 Priority: Blocker  (was: Major)
Fix Version/s: 2.3.1
   2.2.2
   2.1.2

> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.1.2, 2.2.2, 2.3.1
>
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-10 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882516#comment-16882516
 ] 

Sophie Blee-Goldman commented on KAFKA-4212:


Agreed, this wouldn't be possible within the current RocksDBConfigSetter. I 
think the idea was to extend its functionality somehow to make ttl possible 
while still keeping all the rocksdb related configs in the same place, and 
making it clear that this is not "Streams retention" but "RocksDB's TTL"

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880893#comment-16880893
 ] 

Sophie Blee-Goldman commented on KAFKA-4212:


I agree with Matthias that, if we do want to go this route, we should do so by 
just exposing TTL through rocksdb rather than adding a new kind of StateStore 
as a first-class citizen, to make it clear we are just falling back to 
rocksdb's ttl functionality. The store hierarchy is already complicated enough 
and we should avoid adding layers as much as possible.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880886#comment-16880886
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:


You're right, I overlooked that InternalMockProcessorContext is not part of the 
public API and definitely shouldn't be used. In general users are encouraged to 
do their testing through the TopologyTestDriver. You can access state stores 
through it and they will be managed for you so you don't have to worry about 
calling init or close. (This is especially important with the persistent 
variants of the stores as they will leak memory if not properly closed.)

That said, I can see how this would be useful even if not necessary. If you are 
interested in picking this up, I'd be happy to help! Just to outline the 
problem clearly, along with some possible solutions:

This affects all window and session stores, and may affect key-value stores in 
the future should we chose to add metrics.The problem is that the stores cast 
to InternalMockProcessorContext in order to access the  StreamsMetrics, but the 
MockProcessorContext intended for testing does not extend this class. I don't 
think it's ideal to just add "if" guards everywhere this internal context or 
the related metrics/sensors are used in the actual streams code just to make 
room for some test code.

 One possibility would be to have the Internal and Mock processor contexts' 
implement a common interface that has a `metrics()` method, and just return a 
dummy metrics in the Mock case. Alternatively we could add some kind of Mock 
stores that can hide the internals and just present a stable interface for 
testing – either way we would need a KIP. Although I believe there already was 
a KIP regarding mock stores, maybe this could be a part of that

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> 

[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880771#comment-16880771
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:


Yes, you're likely to hit this problem with some of the other store 
implementations as well which also cast to `InternalProcessorContext`. This is 
needed in order to access the metrics. However you should be able to run your 
tests by just using an `InternalMockProcessorContext` instead.

 

[~fetherolfjd] Can I close this as "not a problem" ?

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 

[jira] [Created] (KAFKA-8627) Investigate batching on state restore

2019-07-03 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8627:
--

 Summary: Investigate batching on state restore
 Key: KAFKA-8627
 URL: https://issues.apache.org/jira/browse/KAFKA-8627
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


Currently when rebuilding state from scratch, we form batches based on whatever 
is returned by poll() and write them to RocksDB. Given the structure of 
RocksDB, inserting large sorted batches gives the best performance when writing 
large amounts of data at once, so we should investigate the potential 
restore-time improvement of 

1) Larger batches – either by tuning the restore consumer to return larger 
amounts of data, buffering records into larger batches, or both

2) Sorting batches 

 

These two factors are likely to be coupled, so we should explore the 
performance gains/hits by varying both if possible (ie turn sorting on/off with 
a variety of batch sizes) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8637:
--

 Summary: WriteBatch objects leak off-heap memory
 Key: KAFKA-8637
 URL: https://issues.apache.org/jira/browse/KAFKA-8637
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Sophie Blee-Goldman


In 2.1 we did some refactoring that led to the WriteBatch objects in 
#restoreAllInternal being created in a separate method, rather than in a 
try-with-resources statement. This causes a memory leak as the WriteBatchs are 
no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8637:
---
Description: In 2.1 we did some refactoring that led to the WriteBatch 
objects in RocksDBSegmentedBytesStore#restoreAllInternal being created in a 
separate method, rather than in a try-with-resources statement as used 
elsewhere. This causes a memory leak as the WriteBatches are no longer closed 
automatically  (was: In 2.1 we did some refactoring that led to the WriteBatch 
objects in #restoreAllInternal being created in a separate method, rather than 
in a try-with-resources statement. This causes a memory leak as the WriteBatchs 
are no longer closed automatically)

> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880745#comment-16880745
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


Hey [~pavelsavov]! Sorry for the long silence. I believe we've tracked down 
another memory leak, which if you're able to test it out can be found here: 
[https://github.com/apache/kafka/pull/7049]

It may not be the only leak, as it affects 2.1 as well as 2.2, but we believe 
it's still an important fix. If this does not help we'll keep digging

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-01 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876516#comment-16876516
 ] 

Sophie Blee-Goldman commented on KAFKA-4212:


Hi [~jamesritt], I think your PR is a good first step towards a TTL KV store 
but it's not sufficient to simply expose the rocksdb TTL option. The changelog 
topic is the ultimate source of truth for a store, so if you don't somehow 
clean up old records there you will just restore all the old and deleted 
records on restore/rebalance. One solution might be to somehow store/retrieve 
the current time such that after a restore/rebalance you only load "valid" 
records (see KAFKA-7934)

 

(also as a note, Rocks TTL is not "strict" in that is provides a lower, not 
upper, bound on how long data will remain in the db)

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8603) Document upgrade path

2019-06-25 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8603:
--

 Summary: Document upgrade path
 Key: KAFKA-8603
 URL: https://issues.apache.org/jira/browse/KAFKA-8603
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer, streams
Reporter: Sophie Blee-Goldman


Users need to follow a specific upgrade path in order to smoothly and safely 
perform live upgrade. We should very clearly document the steps needed to 
upgrade a Consumer and a Streams app (note they will be different)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8609) Add consumer metrics for rebalances (part 9)

2019-06-28 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8609:
---
Description: 
We would like to track some additional metrics on the consumer side related to 
rebalancing as part of this KIP, including
 # listener callback latency
 ## partitions-revoked-time-avg
 ## partitions-revoked-time-max
 ## partitions-assigned-time-avg
 ## partitions-assigned-time-max
 ## partitions-lost-time-avg
 ## partitions-lost-time-max
 # rebalance rate (# rebalances per day)
 ## rebalance-rate

  was:
We would like to track some additional metrics on the consumer side related to 
rebalancing as part of this KIP, including
 # total rebalance latency (latency from start to completion of rebalance)
 ## rebalance-time-avg
 ## rebalance-time-max
 # listener callback latency
 ## partitions-revoked-time-avg
 ## partitions-revoked-time-max
 ## partitions-assigned-time-avg
 ## partitions-assigned-time-max
 ## partitions-lost-time-avg
 ## partitions-lost-time-max
 # rebalance rate (# rebalances per day)
 ## rebalance-rate


> Add consumer metrics for rebalances (part 9)
> 
>
> Key: KAFKA-8609
> URL: https://issues.apache.org/jira/browse/KAFKA-8609
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> We would like to track some additional metrics on the consumer side related 
> to rebalancing as part of this KIP, including
>  # listener callback latency
>  ## partitions-revoked-time-avg
>  ## partitions-revoked-time-max
>  ## partitions-assigned-time-avg
>  ## partitions-assigned-time-max
>  ## partitions-lost-time-avg
>  ## partitions-lost-time-max
>  # rebalance rate (# rebalances per day)
>  ## rebalance-rate



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8609) Add consumer metrics for rebalances (part 9)

2019-06-28 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8609:
---
Description: 
We would like to track some additional metrics on the consumer side related to 
rebalancing as part of this KIP, including
 # total rebalance latency (latency from start to completion of rebalance)
 ## rebalance-time-avg
 ## rebalance-time-max
 # listener callback latency
 ## partitions-revoked-time-avg
 ## partitions-revoked-time-max
 ## partitions-assigned-time-avg
 ## partitions-assigned-time-max
 ## partitions-lost-time-avg
 ## partitions-lost-time-max
 # rebalance rate (# rebalances per day)
 ## rebalance-rate

  was:
We would like to track some additional metrics on the consumer side related to 
rebalancing as part of this KIP, including

 

1) total rebalance latency (latency from start to completion of rebalance)

2) per-callback latency (time spent in onPartitionsRevoked, 
onPartitionsAssigned, onPartitionsLost)

3) join/sync group latency (response_received_time - request_sent_time)

4) rebalance rate (# rebalances/day)


> Add consumer metrics for rebalances (part 9)
> 
>
> Key: KAFKA-8609
> URL: https://issues.apache.org/jira/browse/KAFKA-8609
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> We would like to track some additional metrics on the consumer side related 
> to rebalancing as part of this KIP, including
>  # total rebalance latency (latency from start to completion of rebalance)
>  ## rebalance-time-avg
>  ## rebalance-time-max
>  # listener callback latency
>  ## partitions-revoked-time-avg
>  ## partitions-revoked-time-max
>  ## partitions-assigned-time-avg
>  ## partitions-assigned-time-max
>  ## partitions-lost-time-avg
>  ## partitions-lost-time-max
>  # rebalance rate (# rebalances per day)
>  ## rebalance-rate



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8041) Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll

2019-08-12 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905426#comment-16905426
 ] 

Sophie Blee-Goldman commented on KAFKA-8041:


h3. java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/950/testReport/junit/kafka.server/LogDirFailureTest/testIOExceptionDuringLogRoll/]

> Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll
> -
>
> Key: KAFKA-8041
> URL: https://issues.apache.org/jira/browse/KAFKA-8041
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/236/tests]
> {quote}java.lang.AssertionError: Expected some messages
> at kafka.utils.TestUtils$.fail(TestUtils.scala:357)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:787)
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:189)
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:63){quote}
> STDOUT
> {quote}[2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition topic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-10 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-4 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:45:00,248] ERROR Error while rolling log segment for topic-0 
> in dir 
> /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216
>  (kafka.server.LogDirFailureChannel:76)
> java.io.FileNotFoundException: 
> /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216/topic-0/.index
>  (Not a directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:121)
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
> at kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:184)
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:184)
> at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
> at kafka.log.Log.$anonfun$roll$8(Log.scala:1520)
> at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1520)
> at scala.Option.foreach(Option.scala:257)
> at kafka.log.Log.$anonfun$roll$2(Log.scala:1520)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1881)
> at kafka.log.Log.roll(Log.scala:1484)
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:154)
> at 
> 

[jira] [Assigned] (KAFKA-8731) InMemorySessionStore throws NullPointerException on startup

2019-07-29 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8731:
--

Assignee: Sophie Blee-Goldman

> InMemorySessionStore throws NullPointerException on startup
> ---
>
> Key: KAFKA-8731
> URL: https://issues.apache.org/jira/browse/KAFKA-8731
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Jonathan Gordon
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> I receive a NullPointerException on startup when trying to use the new 
> InMemorySessionStore via Stores.inMemorySessionStore(...) using the DSL.
> Here's the stack trace:
> {{ERROR [2019-07-29 21:56:52,246] 
> org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
> [trace_indexer-c8439020-12af-4db2-ad56-3e58cd56540f-StreamThread-1] 
> Encountered the following error during processing:}}
> {{! java.lang.NullPointerException: null}}
> {{! at 
> org.apache.kafka.streams.state.internals.InMemorySessionStore.remove(InMemorySessionStore.java:123)}}
> {{! at 
> org.apache.kafka.streams.state.internals.InMemorySessionStore.put(InMemorySessionStore.java:115)}}
> {{! at 
> org.apache.kafka.streams.state.internals.InMemorySessionStore.lambda$init$0(InMemorySessionStore.java:93)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$1(StateRestoreCallbackAdapter.java:47)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:867)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)}}
> {{! at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)}}
>  
> Here's the Slack thread:
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1564438647169600]
>  
> Here's a current PR aimed at fixing the issue:
> [https://github.com/apache/kafka/pull/7132]
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8807) Flaky Test GlobalThreadShutDownOrderTest.shouldFinishGlobalStoreOperationOnShutDown

2019-08-15 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8807:
--

 Summary: Flaky Test 
GlobalThreadShutDownOrderTest.shouldFinishGlobalStoreOperationOnShutDown
 Key: KAFKA-8807
 URL: https://issues.apache.org/jira/browse/KAFKA-8807
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24229/testReport/junit/org.apache.kafka.streams.integration/GlobalThreadShutDownOrderTest/shouldFinishGlobalStoreOperationOnShutDown/]

 
h3. Error Message

java.lang.AssertionError: expected:<[1, 2, 3, 4]> but was:<[1, 2, 3, 4, 1, 2, 
3, 4]>
h3. Stacktrace

java.lang.AssertionError: expected:<[1, 2, 3, 4]> but was:<[1, 2, 3, 4, 1, 2, 
3, 4]> at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:120) at 
org.junit.Assert.assertEquals(Assert.java:146) at 
org.apache.kafka.streams.integration.GlobalThreadShutDownOrderTest.shouldFinishGlobalStoreOperationOnShutDown(GlobalThreadShutDownOrderTest.java:138)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 

[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-08-19 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910715#comment-16910715
 ] 

Sophie Blee-Goldman commented on KAFKA-4212:


[~jamesritt] By the way, you might want to look into FIFO compaction. It's 
basically a rocksdb compaction style that deletes the oldest data when the 
total size exceeds some threshold. If all you care about is preventing 
unbounded growth then this may be even closer to what you want than TTL, and 
you should be able to set it using the RocksDBConfigSetter.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Reopened] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-08-14 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reopened KAFKA-7912:


The fix for this caused a performance regression and is being rolled back. We 
should look into alternative ways to support concurrent modifications to the 
underlying data structure

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8802) ConcurrentSkipListMap shows performance regression in cache and in-memory store

2019-08-14 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8802:
--

 Summary: ConcurrentSkipListMap shows performance regression in 
cache and in-memory store
 Key: KAFKA-8802
 URL: https://issues.apache.org/jira/browse/KAFKA-8802
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Sophie Blee-Goldman
 Fix For: 2.4.0, 2.3.1


A significant performance regression was seen between 2.2 and 2.3. For 
in-memory stores, about half of it was improved by removing the 
ConcurrentSkipListMap. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8802) ConcurrentSkipListMap shows performance regression in cache and in-memory store

2019-08-20 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911663#comment-16911663
 ] 

Sophie Blee-Goldman commented on KAFKA-8802:


Hi [~f2005...@gmail.com], I can't really give an exact number since it depends 
on so many factors.

> ConcurrentSkipListMap shows performance regression in cache and in-memory 
> store
> ---
>
> Key: KAFKA-8802
> URL: https://issues.apache.org/jira/browse/KAFKA-8802
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> The use of ConcurrentSkipListMap in the cache and in-memory stores caused a 
> performance regression in 2.3.0. We should revert back to using TreeMap 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8802) ConcurrentSkipListMap shows performance regression in cache and in-memory store

2019-08-20 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8802:
---
Description: The use of ConcurrentSkipListMap in the cache and in-memory 
stores caused a performance regression in 2.3.0. We should revert back to using 
TreeMap   (was: A significant performance regression was seen between 2.2 and 
2.3. For in-memory stores, about half of it was improved by removing the 
ConcurrentSkipListMap. )

> ConcurrentSkipListMap shows performance regression in cache and in-memory 
> store
> ---
>
> Key: KAFKA-8802
> URL: https://issues.apache.org/jira/browse/KAFKA-8802
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> The use of ConcurrentSkipListMap in the cache and in-memory stores caused a 
> performance regression in 2.3.0. We should revert back to using TreeMap 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8831) Joining a new instance sometimes does not cause rebalancing

2019-08-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16916186#comment-16916186
 ] 

Sophie Blee-Goldman commented on KAFKA-8831:


Ah, is this the one that's trace because it will supposedly spam the logs? So 
the problem was that you had two non-isolated instances set to the same state 
dir? That seems to make sense since one of them would grab the lock and the 
other would be stuck retrying forever. I think Streams tends to assume only one 
instance per machine, so we don't even consider the deadlock of two instances 
trying for the same file lock. 

We definitely shouldn't just retry infinitely while logging only at the lowest 
level. Someone with more context ([~guozhang]) will have to chime in as to why 
we expect to see this LockException so often it will spam the logs and 
necessitates indefinite retries. At the very least, we should log at a higher 
level if we are retrying for the Nth time for some large N (again, not enough 
context to know what a reasonable value would be). Personally, I feel we should 
just rethrow the exception if we have retried past some threshold rather than 
just spin in deadlock

> Joining a new instance sometimes does not cause rebalancing
> ---
>
> Key: KAFKA-8831
> URL: https://issues.apache.org/jira/browse/KAFKA-8831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chris Pettitt
>Assignee: Chris Pettitt
>Priority: Major
> Attachments: StandbyTaskTest.java, fail.log
>
>
> See attached log. The application is in a REBALANCING state. The second 
> instance joins a bit after the first instance (~250ms). The group coordinator 
> says it is going to rebalance but nothing happens. The first instance gets 
> all partitions (2). The application transitions to RUNNING.
> See attached test, which starts one client and then starts another about 
> 250ms later. This seems to consistently repro the issue for me.
> This is blocking my work on KAFKA-8755, so I'm inclined to pick it up



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8831) Joining a new instance sometimes does not cause rebalancing

2019-08-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16916165#comment-16916165
 ] 

Sophie Blee-Goldman commented on KAFKA-8831:


[~vinoth] Just to clarify, do you delete all topics, then run the reset tool, 
then recreate (only) the input topics? And are you also wiping the state 
(either manually or with KafkaStreams#cleanUp)?

> Joining a new instance sometimes does not cause rebalancing
> ---
>
> Key: KAFKA-8831
> URL: https://issues.apache.org/jira/browse/KAFKA-8831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chris Pettitt
>Assignee: Chris Pettitt
>Priority: Major
> Attachments: StandbyTaskTest.java, fail.log
>
>
> See attached log. The application is in a REBALANCING state. The second 
> instance joins a bit after the first instance (~250ms). The group coordinator 
> says it is going to rebalance but nothing happens. The first instance gets 
> all partitions (2). The application transitions to RUNNING.
> See attached test, which starts one client and then starts another about 
> 250ms later. This seems to consistently repro the issue for me.
> This is blocking my work on KAFKA-8755, so I'm inclined to pick it up



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8831) Joining a new instance sometimes does not cause rebalancing

2019-08-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16916203#comment-16916203
 ] 

Sophie Blee-Goldman commented on KAFKA-8831:


>>Ah, is this the one that's trace because it will supposedly spam the logs?
I was referring to this comment in AssignedTasks#initializeNewTasks – "// made 
this trace as it will spam the logs in the poll loop."  :) 

Maybe I'm missing your point, but it doesn't seem like we should move a task to 
restored (or running) if we are unable to open the state store (not that we 
should just blindly keep retrying either). I think this is mostly a matter of 
better logging and/or documentation and/or reasonable retry limits

> Joining a new instance sometimes does not cause rebalancing
> ---
>
> Key: KAFKA-8831
> URL: https://issues.apache.org/jira/browse/KAFKA-8831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chris Pettitt
>Assignee: Chris Pettitt
>Priority: Major
> Attachments: StandbyTaskTest.java, fail.log
>
>
> See attached log. The application is in a REBALANCING state. The second 
> instance joins a bit after the first instance (~250ms). The group coordinator 
> says it is going to rebalance but nothing happens. The first instance gets 
> all partitions (2). The application transitions to RUNNING.
> See attached test, which starts one client and then starts another about 
> 250ms later. This seems to consistently repro the issue for me.
> This is blocking my work on KAFKA-8755, so I'm inclined to pick it up



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


  1   2   3   4   5   6   7   8   9   10   >