Thanks a lot for the clear answer. One of the concerns that I have is that it's not always obvious when the default serializers are used. e.g. it looks like KGroupedStream#reduce also uses the default serializer under the hood. If one gets the default serializer wrong then she gets run time errors in serialization / de-serialization (ClassCastException etc.), which are quite hard to track down.
On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax <matth...@confluent.io> wrote: > For stream-table-join, only the table is (de)serialized, the stream-side > in only piped through and does lookups into the table. > > And when reading the stream > (https://github.com/confluentinc/kafka-streams- > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > StreamToTableJoinScalaIntegrationTest.scala#L129) > the Serdes from the config are overwritten by parameters passed into > `#stream()` > > The default Serdes are used when reading/writing from/to a topic/store > (including repartition or changelog) and if the operator does not > overwrite the default Serdes via passed-in parameters. > > > -Matthias > > On 2/10/18 10:34 PM, Debasish Ghosh wrote: > > The inputs to the leftJoin are the stream with [String, Long] and the > table > > with [String, String]. Is the default serializer (I mean from the config) > > used for [String, String] ? Then how does the [String, Long] > serialization > > work ? > > > > I guess the basic issue that I am trying to understand is how the default > > serialisers (stringSerde, stringSerde) registered in config used for > > serialising the inputs of leftJoin .. > > > > regards. > > > > On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> userClicksJoinRegion is never serialized... > >> > >> It the result of the join and the join only (de)serializes its input in > >> the internal stores. > >> > >> The output it forwarded in-memory to a consecutive map and return > >> `clicksByRegion` that is [String,Long]. > >> > >> > >> -Matthias > >> > >> On 2/10/18 1:17 PM, Ted Yu wrote: > >>> Please read the javadoc: > >>> > >> https://github.com/apache/kafka/blob/trunk/streams/src/ > main/java/org/apache/kafka/streams/Consumed.java > >>> > >>> and correlate with the sample code. > >>> > >>> Thanks > >>> > >>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh < > >> ghosh.debas...@gmail.com> > >>> wrote: > >>> > >>>> Looking at > >>>> https://github.com/confluentinc/kafka-streams- > >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > >>>> StreamToTableJoinScalaIntegrationTest.scala#L148, > >>>> it seems that the leftJoin generates a KStream[String, (String, > Long)], > >>>> which means the value is a tuple of (String, Long) .. I am not able to > >> get > >>>> how this will serialize/de-serialize with the default serializers > which > >> are > >>>> both stringSerde for keys and values. > >>>> > >>>> or am I missing something ? > >>>> > >>>> regards. > >>>> > >>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >>>> > >>>>> If I read the code correctly, the operation on this line prepares the > >>>> input > >>>>> for the (stringSerde, stringSerde) specified on line 142: > >>>>> > >>>>> .leftJoin(userRegionsTable, (clicks: Long, region: String) => > (if > >>>>> (region == null) "UNKNOWN" else region, clicks)) > >>>>> > >>>>> FYI > >>>>> > >>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh < > >>>> ghosh.debas...@gmail.com > >>>>>> > >>>>> wrote: > >>>>> > >>>>>> Hi - > >>>>>> > >>>>>> I was going through this example at > >>>>>> https://github.com/confluentinc/kafka-streams- > >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > >>>>>> StreamToTableJoinScalaIntegrationTest.scala, > >>>>>> especially the leftJoin part > >>>>>> https://github.com/confluentinc/kafka-streams- > >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156. > >>>>>> This leftJoin returns KStream[String, (String, Long)], while default > >>>>>> serializers are String for both key and value as in > >>>>>> https://github.com/confluentinc/kafka-streams- > >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113. > >>>>>> My question is how does this serialization work here ? I mean how > does > >>>>> the > >>>>>> tuple get serialized with the default serializers ? And leftJoin > only > >>>>> works > >>>>>> with default serializers .. > >>>>>> > >>>>>> regards. > >>>>>> > >>>>>> -- > >>>>>> Debasish Ghosh > >>>>>> http://manning.com/ghosh2 > >>>>>> http://manning.com/ghosh > >>>>>> > >>>>>> Twttr: @debasishg > >>>>>> Blog: http://debasishg.blogspot.com > >>>>>> Code: http://github.com/debasishg > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> Debasish Ghosh > >>>> http://manning.com/ghosh2 > >>>> http://manning.com/ghosh > >>>> > >>>> Twttr: @debasishg > >>>> Blog: http://debasishg.blogspot.com > >>>> Code: http://github.com/debasishg > >>>> > >>> > >> > >> -- > > Sent from my iPhone > > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg