Vivek, another option for you is to replace the `map()` calls with `mapValues()`. Can you give that a try?
Background: You are calling `map()` on your two input streams, but in neither of the two cases are you actually changing the key (it is always the userId before the map, and it stays the userId after the map), see below. However, Kafka Streams can't deduce that you are effectively not changing the key during the `map()`. If you use `mapValues()`, then by definition you are only changing values but not modifying the keys. Idea: userClickStream .filter((userId,record)-> userId != null) .map((userId,record) -> new > KeyValue<>(userId,1l)); userEventStream .filter((userId,record)-> userId != null) .map((userId,record) -> new KeyValue<>(userId,JsonPath.read(record, "$.event.page.channel").toString())); should become sth like: userClickStream .filter((userId,record)-> userId != null) .mapValues(record -> 1L); userEventStream .filter((userId,record)-> userId != null) .mapValues(record -> JsonPath.read(record, "$.event.page.channel").toString()); Best, Michael On Thu, Jul 14, 2016 at 10:24 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > Both streams need to be co-partitioned, ie, if you change the key of one > join input, you need to re-partitioned this stream on the new key via > .through(). You should create the topic you use in through() manually, > before you start your Kafka Streams application. > > (For future release, this re-partitioning will happen automatically.) > > In your case, if I did not miss anything, you need to re-partition both > streams as you apply map() on both before the join. > > KStream streamA = stream1.map(...).through(...); > KStream streamB = stream2.map(...).through(...); > > KStream joinResult = streamA.join(streamB, ...); > > > -Matthias > > On 07/14/2016 03:26 AM, vivek thakre wrote: > > Yes, there are same number of partitions to both the topic, also same > > partition key i.e userId > > If I just join the streams without applying the map functions (in this > > case userClickStream > > and userEventSrtream) , it works. > > > > Thanks, > > Vivek > > > > > > On Wed, Jul 13, 2016 at 4:53 PM, Philippe Derome <phder...@gmail.com> > wrote: > > > >> Did you specify same number of partitions for the two input topics you > are > >> joining? I think that this is usually the first thing people ask to > verify > >> with errors similar to yours. > >> > >> If you are experimenting with learning some concepts, it is simpler to > >> always use one partition for your topics. > >> On 13 Jul 2016 7:40 p.m., "vivek thakre" <vivek.tha...@gmail.com> > wrote: > >> > >>> Hello, > >>> > >>> I want to join 2 Topics (KStreams) > >>> > >>> > >>> Stream 1 > >>> Topic : userIdClicks > >>> Key : userId > >>> Value : JSON String with event details > >>> > >>> Stream 2 > >>> Topic : userIdChannel > >>> Key : userId > >>> Value : JSON String with event details and has channel value > >>> > >>> I could not find any examples with KStream-to-KStream Join. > >>> > >>> Here is my code > >>> > >>> //build stream userIdClicks > >>>> KStream<String, Long> userClickStream = builder.stream(stringSerde, > >>> stringSerde, > >>>> "userClicks"); > >>>> > >>> > >>> > >>>> //create stream -> < userId, 1 (count) > > >>>> KStream<String, Long> *userClickCountStream* = > userClickStream.filter(( > >>>> userId,record)-> userId != null) .map((userId,record) -> new > >> KeyValue<>( > >>>> userId,1l)); > >>>> > >>> > >>> > >>>> //build stream userChannelStream > >>>> KStream<String, String> userEventStream = builder.stream(stringSerde, > >>>> stringSerde, "userEvents"); > >>>> > >>> > >>> > >>>> //create stream <userId, channel> : extract channel value from json > >>> string > >>>> KStream<String, String> *userChannelStream* = userEventStream > >>>> .filter((userId,record)-> userId != null) > >>>> .map((userId,record) -> new KeyValue<>(userId > >>>> ,JsonPath.read(record, "$.event.page.channel").toString())); > >>>> > >>> > >>> > >>>> //join *userClickCountStream* with > >>>> *userChannelStream*KTable<String, Long> clicksPerChannel = > >>>> userClickCountStream > >>>> .join(userChannelStream, new ValueJoiner<Long, String, > >>>> ChannelWithClicks>() { > >>>> @Override > >>>> public ChannelWithClicks apply(Long clicks, String > >> channel) > >>> { > >>>> return new ChannelWithClicks(channel == null ? > >> "UNKNOWN" > >>>> : channel, clicks); > >>>> } > >>>> }, > >>> JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000)) > >>>> //30 secs before and after > >>>> .map((user, channelWithClicks) -> new > >>> KeyValue<>(channelWithClicks > >>>> .getChannel(), channelWithClicks.getClicks())) > >>>> .reduceByKey( > >>>> (firstClicks, secondClicks) -> firstClicks + > >>>> secondClicks, > >>>> stringSerde, longSerde, > >> "ClicksPerChannelUnwindowed" > >>>> ); > >>> > >>> When I run this topology, I get an exception > >>> > >>> Invalid topology building: KSTREAM-MAP-0000000003 and > >>> KSTREAM-MAP-0000000006 are not joinable > >>> > >>> I looking for a way to join 2 KStreams. > >>> > >>> Thanks, > >>> > >>> Vivek > >>> > >> > > > > -- *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download Apache Kafka and Confluent Platform: www.confluent.io/download <http://www.confluent.io/download>*