[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417959#comment-16417959
 ] 

Cemalettin Koç commented on KAFKA-6713:
---

Hi [~guozhang], 

For 1 and 2)

I would like to access my customized version of `CategoryInMemoryStore`. I will 
be glad if you write something simple for me. I could not follow at your 
comment.

 

3) My category topic is updated rarely. I need all data and I am using 
GlobalKTable and I am using some rendering at my pages. Since it is changing 
rarely, I would like to cache it but in case an update I would like to 
invalidate this cache. However currently I could not find a way to be notified 
in case a my GlobalKTable based InMemoryStore is updated. If a new Category 
added or changed in my InMemoryStore, I would like to trigger invalidation of 
my rendering cache.

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414606#comment-16414606
 ] 

Guozhang Wang commented on KAFKA-6713:
--

Hi [~cemo], I've read your code, and here are some follow-up questions / 
clarifications:

1. StreamsBuilder.globalTable is expecting a {{Materialized>}} parameter and hence with `Materialized.as` one 
should only pass in a {{KeyValueBytesStoreSupplier}} that generates a 
{{KeyValueStore}}. So you do not need to template your 
{{DelegatingByteStore}} but just let it to use a `{{KeyValueStore delegated}} internally. Using a converter after the serde will 
unnecessarily calling serdes three times other than one: when a  pair is 
passed in, you woud first use the key/value serde to serialize it into bytes, 
and then deserialize it in your {{DelegatingByteStore}} implementation with the 
converter into  again, and then when with the in-memory  store it 
will once again serialize it into bytes before putting to cache.

2. In your code you are re-using the same {{DelegatingByteStore}} in your 
{{KeyValueBytesStoreSupplier}}, i.e. whenever `get()` is called it will always 
return the same store object. Is it intentional? Note that although it is fine 
for now since we will only call `get()` once across all threads for global 
store, this is an internal implementation detail that maybe changed. To be 
safer I'd suggest you generate a new object in your supplier per each `get()` 
call.

3. About your invalidation use cases, I'm not sure I can follow completely... 
could you elaborate a bit more?

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414477#comment-16414477
 ] 

Cemalettin Koç commented on KAFKA-6713:
---

Today I came across another scenario which is still not very clear for me. :) 
For my use case I have chosen our "Category" entity since It is updated rarely 
and it is data cardinality is suitable for a newbie Kafka user. :) 

GlobalKTable is very nice since it is magically filling our in memory stores. 
However there are some cases I need to invalidate some computed data which is 
based whole data of store. 

Please forgive my ignorance since I have just started to use Kafka but I 
thought It would be nice to have something like KStream as GlobalKStream. :) I 
can process and trigger some invalidation in case a new updated category? 

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414469#comment-16414469
 ] 

Cemalettin Koç commented on KAFKA-6713:
---

Hi [~guozhang], I have copied TypeConverter internal interface and added some 
other methods for my needs. What I want to use actually using a custom in 
memory store. Then I wanted to pass this `category in memory store` to my 
`category service`. 

 

 
{code:java}
class CategoryInMemoryStore extends InMemoryKeyValueStore { // 
implementation }
{code}
 

I have created an instance of `CategoryInMemoryStore` and passed into my 
StreamsBuilder as this:

 
{code:java}
public GlobalKTable categoryKGlobalTable(StreamsBuilder 
streamsBuilder) {
  KeyValueBytesStoreSupplier supplier =
  new DelegatingByteStore<>(categoryInMemoryStore, 
createConvertor()).asSupplier();
  return streamsBuilder.globalTable(categoryTopic,
Materialized.as(supplier)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE));
}
{code}
The whole point of the files I have attached to create a my in memory key value 
store implementation. 

I have also attached my implementations which are used above. 

 

 

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414323#comment-16414323
 ] 

Guozhang Wang commented on KAFKA-6713:
--

Hi [~cemo] Thanks for reporting this, is very valuable to us. What's puzzles me 
is why you'd need to implement the {{TypeConverter}} interface, as it is an 
internal interface and is not supposed to be enforced to users. Could you share 
more of your code snippet to help me understand better the cumbersomeness. Note 
that if you are using DSL and trying to implement the customized store, you 
should only need to implement the {{KeyValueBytesStoreSupplier}}, and the 
{{KeyValueStore}} it generates. The serdes will be auto-handled 
by the streams library.

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)