Hi List,
There is a Join operation between a stream that is marked for
repartition and a KTable that I can't make it work.
The same left join works fine when I don't apply the Map operator (for
changing key) so I must be doing something wrong with Serdes.

The topology is:

1) Loading a KTable from topic B <Integer,String> :

final KTable<Integer, B> ktableB = builder
        .stream(topicB, Consumed.with(integerSerde, stringSerde))
        .mapValues(v -> gson.fromJson(v, B.class)).toTable();

2) Loading a KStream from topic A <Integer,String>

builder
        .stream(topicA, Consumed.with(integerSerde, stringSerde))
        .mapValues(v -> gson.fromJson(v, A.class))

3) new key on streamA to expose the field with reference to B Ktable

        .map((k,v)-> new KeyValue<Integer, A>(v.refB, v))

4) now that stream and table share same ID, I do a left join

        .leftJoin(signalKtable, joiner, Joined.with(
                Serdes.Integer(),
                serdesA,  //serder for Value object in KStream
                serdesB)) // serdes for Value object in KTable

5) deserialized value to json and send to output
        .mapValues(v->v.toString())
        .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), stringSerde));


First error I got:

org.apache.kafka.streams.errors.StreamsException: ClassCastException
while producing data to topic
simple-test-KSTREAM-MAP-0000000006-repartition. A serializer (key:
org.apache.kafka.common.serialization.IntegerSerializer / value:
org.apache.kafka.common.serialization.ByteArraySerializer) is not
compatible to the actual key or value type (key type:
java.lang.Integer / value type: com.zzzzzz.events.model.A). Change the
default Serdes in StreamConfig or provide correct Serdes via method
parameters (for example if using the DSL, `#to(String topic,
Produced<K, V> produced)` with
`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

so I assume the "Joined.with(" argument didn't work, and if I add this property:

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
ASerdes.class);

I get a "NullPointerException" error, from RecordCollectorImpl.java,
where valueSerializer is null:

@Override
public <K, V> void send(final String topic,
                        final K key,
                        final V value,
                        final Headers headers,
                        final Integer partition,
                        final Long timestamp,
                        final Serializer<K> keySerializer,
                        final Serializer<V> valueSerializer) {
    checkForException();

    final byte[] keyBytes;
    final byte[] valBytes;
    try {
        keyBytes = keySerializer.serialize(topic, headers, key);
        valBytes = valueSerializer.serialize(topic, headers, value);
                                 ^----- NullPointerException


I also figured out a way to solve this by sending the Stream to a
topic right after the "map" operator for rekeying,
and then loading that new topic into a new Stream, and using that for
applying the left join.
That trick worked, but I'm pretty sure there is a more elegant way to do this.

Thanks folks, any help is really appreciated.





















-- 
Richard Rossel
Atlanta - GA

Reply via email to