Glad you figured it out.

If you think the docs should be improved, we take PRs :) -- In the end,
it's a broader community effort to have good docs...


-Matthias

On 6/11/21 12:10 PM, Richard Rossel wrote:
>  Thanks Matthias for your help,
> I was able to find the issue, it was actually my Serdes definition. It
> is very hard to find those
> type problems, the execution trace log are not very helpful, and
> documentation regarding custom
> serdes are not that great.
> 
> Thanks for the tip regarding the need of materializing tables, I had
> no idea about the "push down" process.
> Do you know where I can find some documentation regarding that process?
> 
> 
> On Tue, Jun 8, 2021 at 5:54 PM Matthias J. Sax <[email protected]> wrote:
>>
>> Can you create a ticket? There is at least one bug I found looking into
>> the code. (At least in `trunk` -- what version are you using? -- we
>> might have fixed a few bugs in newer versions -- I could not reproduce
>> all issue you report).
>>
>>
>> Btw: you will actually need to set the value serde via
>>
>>   .toTable(Materilzied.with(null, serdesB)
>>
>> The table needs to be materialized for the join, and thus, you need to
>> specify the serde for `B.class`.
>>
>> The specified serde via `Joined`, does not apply atm to the upstream
>> table materialization -- Kafka Streams does on "push down" serde from
>> upstream do downstream nodes, but not (yet) into the other direction.
>>
>> (`Joined` is also used for stream-stream join and `otherValueSerde` is
>> uses there -- for stream-table join, `otherValueSerde` is ignored atm.)
>>
>> Thus, after mapValue() that changes the value type, we don't have a
>> serde at hand and fall back to the config.
>>
>>
>> -Matthias
>>
>> On 6/7/21 8:02 PM, Richard Rossel wrote:
>>> Hi List,
>>> There is a Join operation between a stream that is marked for
>>> repartition and a KTable that I can't make it work.
>>> The same left join works fine when I don't apply the Map operator (for
>>> changing key) so I must be doing something wrong with Serdes.
>>>
>>> The topology is:
>>>
>>> 1) Loading a KTable from topic B <Integer,String> :
>>>
>>> final KTable<Integer, B> ktableB = builder
>>>         .stream(topicB, Consumed.with(integerSerde, stringSerde))
>>>         .mapValues(v -> gson.fromJson(v, B.class)).toTable();
>>>
>>> 2) Loading a KStream from topic A <Integer,String>
>>>
>>> builder
>>>         .stream(topicA, Consumed.with(integerSerde, stringSerde))
>>>         .mapValues(v -> gson.fromJson(v, A.class))
>>>
>>> 3) new key on streamA to expose the field with reference to B Ktable
>>>
>>>         .map((k,v)-> new KeyValue<Integer, A>(v.refB, v))
>>>
>>> 4) now that stream and table share same ID, I do a left join
>>>
>>>         .leftJoin(signalKtable, joiner, Joined.with(
>>>                 Serdes.Integer(),
>>>                 serdesA,  //serder for Value object in KStream
>>>                 serdesB)) // serdes for Value object in KTable
>>>
>>> 5) deserialized value to json and send to output
>>>         .mapValues(v->v.toString())
>>>         .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), stringSerde));
>>>
>>>
>>> First error I got:
>>>
>>> org.apache.kafka.streams.errors.StreamsException: ClassCastException
>>> while producing data to topic
>>> simple-test-KSTREAM-MAP-0000000006-repartition. A serializer (key:
>>> org.apache.kafka.common.serialization.IntegerSerializer / value:
>>> org.apache.kafka.common.serialization.ByteArraySerializer) is not
>>> compatible to the actual key or value type (key type:
>>> java.lang.Integer / value type: com.zzzzzz.events.model.A). Change the
>>> default Serdes in StreamConfig or provide correct Serdes via method
>>> parameters (for example if using the DSL, `#to(String topic,
>>> Produced<K, V> produced)` with
>>> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
>>>
>>> so I assume the "Joined.with(" argument didn't work, and if I add this 
>>> property:
>>>
>>> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>>> ASerdes.class);
>>>
>>> I get a "NullPointerException" error, from RecordCollectorImpl.java,
>>> where valueSerializer is null:
>>>
>>> @Override
>>> public <K, V> void send(final String topic,
>>>                         final K key,
>>>                         final V value,
>>>                         final Headers headers,
>>>                         final Integer partition,
>>>                         final Long timestamp,
>>>                         final Serializer<K> keySerializer,
>>>                         final Serializer<V> valueSerializer) {
>>>     checkForException();
>>>
>>>     final byte[] keyBytes;
>>>     final byte[] valBytes;
>>>     try {
>>>         keyBytes = keySerializer.serialize(topic, headers, key);
>>>         valBytes = valueSerializer.serialize(topic, headers, value);
>>>                                  ^----- NullPointerException
>>>
>>>
>>> I also figured out a way to solve this by sending the Stream to a
>>> topic right after the "map" operator for rekeying,
>>> and then loading that new topic into a new Stream, and using that for
>>> applying the left join.
>>> That trick worked, but I'm pretty sure there is a more elegant way to do 
>>> this.
>>>
>>> Thanks folks, any help is really appreciated.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
> 
> 
> 

Reply via email to