Hi, Both streams need to be co-partitioned, ie, if you change the key of one join input, you need to re-partitioned this stream on the new key via .through(). You should create the topic you use in through() manually, before you start your Kafka Streams application.
(For future release, this re-partitioning will happen automatically.) In your case, if I did not miss anything, you need to re-partition both streams as you apply map() on both before the join. KStream streamA = stream1.map(...).through(...); KStream streamB = stream2.map(...).through(...); KStream joinResult = streamA.join(streamB, ...); -Matthias On 07/14/2016 03:26 AM, vivek thakre wrote: > Yes, there are same number of partitions to both the topic, also same > partition key i.e userId > If I just join the streams without applying the map functions (in this > case userClickStream > and userEventSrtream) , it works. > > Thanks, > Vivek > > > On Wed, Jul 13, 2016 at 4:53 PM, Philippe Derome <phder...@gmail.com> wrote: > >> Did you specify same number of partitions for the two input topics you are >> joining? I think that this is usually the first thing people ask to verify >> with errors similar to yours. >> >> If you are experimenting with learning some concepts, it is simpler to >> always use one partition for your topics. >> On 13 Jul 2016 7:40 p.m., "vivek thakre" <vivek.tha...@gmail.com> wrote: >> >>> Hello, >>> >>> I want to join 2 Topics (KStreams) >>> >>> >>> Stream 1 >>> Topic : userIdClicks >>> Key : userId >>> Value : JSON String with event details >>> >>> Stream 2 >>> Topic : userIdChannel >>> Key : userId >>> Value : JSON String with event details and has channel value >>> >>> I could not find any examples with KStream-to-KStream Join. >>> >>> Here is my code >>> >>> //build stream userIdClicks >>>> KStream<String, Long> userClickStream = builder.stream(stringSerde, >>> stringSerde, >>>> "userClicks"); >>>> >>> >>> >>>> //create stream -> < userId, 1 (count) > >>>> KStream<String, Long> *userClickCountStream* = userClickStream.filter(( >>>> userId,record)-> userId != null) .map((userId,record) -> new >> KeyValue<>( >>>> userId,1l)); >>>> >>> >>> >>>> //build stream userChannelStream >>>> KStream<String, String> userEventStream = builder.stream(stringSerde, >>>> stringSerde, "userEvents"); >>>> >>> >>> >>>> //create stream <userId, channel> : extract channel value from json >>> string >>>> KStream<String, String> *userChannelStream* = userEventStream >>>> .filter((userId,record)-> userId != null) >>>> .map((userId,record) -> new KeyValue<>(userId >>>> ,JsonPath.read(record, "$.event.page.channel").toString())); >>>> >>> >>> >>>> //join *userClickCountStream* with >>>> *userChannelStream*KTable<String, Long> clicksPerChannel = >>>> userClickCountStream >>>> .join(userChannelStream, new ValueJoiner<Long, String, >>>> ChannelWithClicks>() { >>>> @Override >>>> public ChannelWithClicks apply(Long clicks, String >> channel) >>> { >>>> return new ChannelWithClicks(channel == null ? >> "UNKNOWN" >>>> : channel, clicks); >>>> } >>>> }, >>> JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000)) >>>> //30 secs before and after >>>> .map((user, channelWithClicks) -> new >>> KeyValue<>(channelWithClicks >>>> .getChannel(), channelWithClicks.getClicks())) >>>> .reduceByKey( >>>> (firstClicks, secondClicks) -> firstClicks + >>>> secondClicks, >>>> stringSerde, longSerde, >> "ClicksPerChannelUnwindowed" >>>> ); >>> >>> When I run this topology, I get an exception >>> >>> Invalid topology building: KSTREAM-MAP-0000000003 and >>> KSTREAM-MAP-0000000006 are not joinable >>> >>> I looking for a way to join 2 KStreams. >>> >>> Thanks, >>> >>> Vivek >>> >> >
signature.asc
Description: OpenPGP digital signature