I don't think that is the issue. The join api says: public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) In my case V is Map<String, List<X>> V1 is List<X> R is Map<String, List<X>> K is String
Note the final V and V1 are arrived after doing transformation on original streams <String, Y> So there are intermediate steps like stream.map(new KeyValueMapper<String, Y, KeyValue<String, X>>()) and table.mapValues(new ValueMapper<X, Map<String,X>>() So whenever I modify the structure of a stream or table do I need to back it up with a new kafka topic calling through("new-mapped-topic") ? Thanks Sachin On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty <mgai...@hotmail.com> wrote: > > > > > From: sjmit...@gmail.com > > Date: Sat, 8 Oct 2016 15:30:33 +0530 > > Subject: Understanding org.apache.kafka.streams.errors. > TopologyBuilderException > > To: users@kafka.apache.org > > > > Hi, > > I am getting this exception > > > > Exception in thread "main" > > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology > > building: KTABLE-MAPVALUES-0000000007 and KSTREAM-AGGREGATE-0000000009 > are > > not joinable > > > MG>look at join declaration for org.apache.kafka.streams. > kstream.internals.KTableImpl.java > MG> public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, > V1, R> joiner) > MG>method join assumes 2 collections that exactly match the generic > declaration of join method > > MG>KTable<String, Map<String, List<V1>>> != KTable<String, List<V1>> (2nd > parameter is missing both V and R declarators) > MG>you can establish a new collection of KTable<String, List<V1>> > > MG>and then *join* KTable<String, Map<String, List<V1>>> into > KTable<String, List<V1>> thru custom join method > > > What I am trying to do is I aggregate a KStream into a KTable of type > > KTable<String, Map<String, List<V>>> > > > > and I am trying to join it to another KStream which is aggregated into > > another KTable of type > > KTable<String, List<V>> > > > > Since keys of both the final KTable are same, I don't understand why it > is > > giving this exception. > > > > Thanks > > Sachin > >