Hi John, This was very helpful. However I am still confused about when to set the names for Materialized and Grouped. I am basically setting the names because to have definite names of state stores and internal topics identifiable for debugging purpose.
So when we set a name, do we also need to set serde for key/value type? If not then what defaults are used by them? I'll just explain by quick example: My original code was: table = stream.map((k, v) -> ...).groupByKey().reduce((av, nv) -> nv) In order to set some names to the intermediate stores/topics I changed the code as: table = stream.map((k, v) -> ...).groupByKey(Grouped.with("group", Serde<K>, Serde<V>)).reduce((av, nv) -> nv, Materialized.as("store")) So I wanted to know once I create a named Materialzed do I need to set its key/value serde too? so is this the better code table = stream .map((k, v) -> ...) .groupByKey(Grouped.with("group", Serde<K>, Serde<V>)) .reduce((av, nv) -> nv, Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(Serde<K>).withValueSerde(Serde<V>))) Note that I have custom class for Key and Value. Thanks Sachin On Fri, Dec 6, 2019 at 11:02 PM John Roesler <vvcep...@apache.org> wrote: > Hi Sachin, > > The way that Java infers generic arguments makes that case particularly > obnoxious. > > By the way, the problem you're facing is specifically addressed by these > relatively new features: > * > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > * > https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping > > Since this behavior has been under development recently, I thought you > might benefit from the context. > > To answer your question, what you have to do is explicitly mention the > type arguments to "Materialized.as(name)" when you're using the > withKeySerde, etc. > > It will look something like this: > > Materialized > .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name") > .withKeySerde(new Serde<KeyType>...) > .withValueSerde(new Serde<ValueType>...)); > > I can explain exactly why this is necessary if you want, but the short > answer is that the Java type system only makes a rudimentary effort to > infer types. > > FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can > find a way to fix it, if we ever change the Materialized builder interface. > > Hope this helps, > -John > > On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote: > > Hi, > > In my application I have names of internal topics like this: > > > > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0 > > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0 > > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0 > > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0 > > > > Is it possible to set concrete names for these instead of say ** > > KSTREAM-JOINOTHER-0000000059-store** > > > > This way I can identify at what code in my DSL is responsible for data > > inside them. > > > > So far I have set names for: > > Grouped.with > > Materialized.as > > Joined.with > > > > This has helped me get concrete names at many places however still at > some > > places I see arbitrary names. > > > > Also note that somehow this code works > > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>()) > > > > But not: > > Materialized.as("d-l-i-store").withKeySerde(new > > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>()) > > > > The error I get is: > > Description Resource Path Location Type > > The method withKeySerde(Serde<Object>) in the type > > Materialized<Object,Object,StateStore> is not applicable for the > arguments > > (JSONSerde<K>) > > > > I have my class > > > > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>, > > Deserializer<T>, Serde<T> { > > ...... > > } > > > > This is pretty much same as from kafka streams typed example. > > > > Thanks > > Sachin > > >