Damian Guy created KAFKA-5650:
---------------------------------

             Summary: Provide a simple way for custom storage engines to use 
streams wrapped stores (KIP-182)
                 Key: KAFKA-5650
                 URL: https://issues.apache.org/jira/browse/KAFKA-5650
             Project: Kafka
          Issue Type: Bug
            Reporter: Damian Guy
            Assignee: Damian Guy


As per KIP-182:
A new interface will be added:
{code}
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {
 
    StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withCachingDisabled();
    StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    StateStoreBuilder<T> withLoggingDisabled();
    T build();
}
{code}

This interface will be used to wrap stores with caching, logging etc.
Additionally some convenience methods on the {{Stores}} class:

{code}
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> 
persistentKeyValueStore(final String name,
                                                                                
     final Serde<K> keySerde,
                                                                                
     final Serde<V> valueSerde)
 
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> 
inMemoryKeyValueStore(final String name,
                                                                                
final Serde<K> keySerde,
                                                                                
final Serde<V> valueSerde)
 
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final 
String name,
                                                                final int 
capacity,
                                                                final Serde<K> 
keySerde,
                                                                final Serde<V> 
valueSerde)
 
public static <K, V> StateStoreSupplier<WindowStore<K, V>> 
persistentWindowStore(final String name,
                                                                            
final Windows windows,
                                                                            
final Serde<K> keySerde,
                                                                            
final Serde<V> valueSerde)
 
public static <K, V> StateStoreSupplier<SessionStore<K, V>> 
persistentSessionStore(final String name,
                                                                              
final SessionWindows windows,
                                                                              
final Serde<K> keySerde,
                                                                              
final Serde<V> valueSerde)
 
/**
 *  The following methods are for use with the PAPI. They allow building of 
StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the 
KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final 
StateStoreSupplier<WindowStore<K, V>> supplier)
 
public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final 
StateStoreSupplier<KeyValueStore<K, V>> supplier)
 
public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final 
StateStoreSupplier<SessionStore<K, V>> supplier)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to