Each operator that needs to use a Serde, has a an according overload
method that allows you to overwrite the Serde. If you don't overwrite
it, the operator uses the Serde from the config.

> If one gets the default
>> serializer wrong then she gets run time errors in serialization /
>> de-serialization (ClassCastException etc.)

Default Serde are helpful if you use a generic format like Avro
thoughout the whole topology. If you have many different types, it might
be better to set default Serdes to `null` and set the Serde for each
operator individually.


-Matthias

On 2/12/18 2:16 AM, Debasish Ghosh wrote:
> Thanks a lot for the clear answer.
> 
> One of the concerns that I have is that it's not always obvious when the
> default serializers are used. e.g. it looks like KGroupedStream#reduce also
> uses the default serializer under the hood. If one gets the default
> serializer wrong then she gets run time errors in serialization /
> de-serialization (ClassCastException etc.), which are quite hard to track
> down.
> 
> On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> For stream-table-join, only the table is (de)serialized, the stream-side
>> in only piped through and does lookups into the table.
>>
>> And when reading the stream
>> (https://github.com/confluentinc/kafka-streams-
>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>> StreamToTableJoinScalaIntegrationTest.scala#L129)
>> the Serdes from the config are overwritten by parameters passed into
>> `#stream()`
>>
>> The default Serdes are used when reading/writing from/to a topic/store
>> (including repartition or changelog) and if the operator does not
>> overwrite the default Serdes via passed-in parameters.
>>
>>
>> -Matthias
>>
>> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
>>> The inputs to the leftJoin are the stream with [String, Long] and the
>> table
>>> with [String, String]. Is the default serializer (I mean from the config)
>>> used for [String, String] ? Then how does the [String, Long]
>> serialization
>>> work ?
>>>
>>> I guess the basic issue that I am trying to understand is how the default
>>> serialisers (stringSerde, stringSerde) registered in config used for
>>> serialising the inputs of leftJoin ..
>>>
>>> regards.
>>>
>>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> userClicksJoinRegion is never serialized...
>>>>
>>>> It the result of the join and the join only (de)serializes its input in
>>>> the internal stores.
>>>>
>>>> The output it forwarded in-memory to a consecutive map and return
>>>> `clicksByRegion` that is [String,Long].
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 2/10/18 1:17 PM, Ted Yu wrote:
>>>>> Please read the javadoc:
>>>>>
>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> main/java/org/apache/kafka/streams/Consumed.java
>>>>>
>>>>> and correlate with the sample code.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
>>>> ghosh.debas...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Looking at
>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L148,
>>>>>> it seems that the leftJoin generates a KStream[String, (String,
>> Long)],
>>>>>> which means the value is a tuple of (String, Long) .. I am not able to
>>>> get
>>>>>> how this will serialize/de-serialize with the default serializers
>> which
>>>> are
>>>>>> both stringSerde for keys and values.
>>>>>>
>>>>>> or am I missing something ?
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> If I read the code correctly, the operation on this line prepares the
>>>>>> input
>>>>>>> for the (stringSerde, stringSerde) specified on line 142:
>>>>>>>
>>>>>>>       .leftJoin(userRegionsTable, (clicks: Long, region: String) =>
>> (if
>>>>>>> (region == null) "UNKNOWN" else region, clicks))
>>>>>>>
>>>>>>> FYI
>>>>>>>
>>>>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com
>>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi -
>>>>>>>>
>>>>>>>> I was going through this example at
>>>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>>>> StreamToTableJoinScalaIntegrationTest.scala,
>>>>>>>> especially the leftJoin part
>>>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
>>>>>>>> This leftJoin returns KStream[String, (String, Long)], while default
>>>>>>>> serializers are String for both key and value as in
>>>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
>>>>>>>> My question is how does this serialization work here ? I mean how
>> does
>>>>>>> the
>>>>>>>> tuple get serialized with the default serializers ? And leftJoin
>> only
>>>>>>> works
>>>>>>>> with default serializers ..
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>
>>>> --
>>> Sent from my iPhone
>>>
>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to