Thanks Matthias for your help, I was able to find the issue, it was actually my Serdes definition. It is very hard to find those type problems, the execution trace log are not very helpful, and documentation regarding custom serdes are not that great.
Thanks for the tip regarding the need of materializing tables, I had no idea about the "push down" process. Do you know where I can find some documentation regarding that process? On Tue, Jun 8, 2021 at 5:54 PM Matthias J. Sax <[email protected]> wrote: > > Can you create a ticket? There is at least one bug I found looking into > the code. (At least in `trunk` -- what version are you using? -- we > might have fixed a few bugs in newer versions -- I could not reproduce > all issue you report). > > > Btw: you will actually need to set the value serde via > > .toTable(Materilzied.with(null, serdesB) > > The table needs to be materialized for the join, and thus, you need to > specify the serde for `B.class`. > > The specified serde via `Joined`, does not apply atm to the upstream > table materialization -- Kafka Streams does on "push down" serde from > upstream do downstream nodes, but not (yet) into the other direction. > > (`Joined` is also used for stream-stream join and `otherValueSerde` is > uses there -- for stream-table join, `otherValueSerde` is ignored atm.) > > Thus, after mapValue() that changes the value type, we don't have a > serde at hand and fall back to the config. > > > -Matthias > > On 6/7/21 8:02 PM, Richard Rossel wrote: > > Hi List, > > There is a Join operation between a stream that is marked for > > repartition and a KTable that I can't make it work. > > The same left join works fine when I don't apply the Map operator (for > > changing key) so I must be doing something wrong with Serdes. > > > > The topology is: > > > > 1) Loading a KTable from topic B <Integer,String> : > > > > final KTable<Integer, B> ktableB = builder > > .stream(topicB, Consumed.with(integerSerde, stringSerde)) > > .mapValues(v -> gson.fromJson(v, B.class)).toTable(); > > > > 2) Loading a KStream from topic A <Integer,String> > > > > builder > > .stream(topicA, Consumed.with(integerSerde, stringSerde)) > > .mapValues(v -> gson.fromJson(v, A.class)) > > > > 3) new key on streamA to expose the field with reference to B Ktable > > > > .map((k,v)-> new KeyValue<Integer, A>(v.refB, v)) > > > > 4) now that stream and table share same ID, I do a left join > > > > .leftJoin(signalKtable, joiner, Joined.with( > > Serdes.Integer(), > > serdesA, //serder for Value object in KStream > > serdesB)) // serdes for Value object in KTable > > > > 5) deserialized value to json and send to output > > .mapValues(v->v.toString()) > > .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), stringSerde)); > > > > > > First error I got: > > > > org.apache.kafka.streams.errors.StreamsException: ClassCastException > > while producing data to topic > > simple-test-KSTREAM-MAP-0000000006-repartition. A serializer (key: > > org.apache.kafka.common.serialization.IntegerSerializer / value: > > org.apache.kafka.common.serialization.ByteArraySerializer) is not > > compatible to the actual key or value type (key type: > > java.lang.Integer / value type: com.zzzzzz.events.model.A). Change the > > default Serdes in StreamConfig or provide correct Serdes via method > > parameters (for example if using the DSL, `#to(String topic, > > Produced<K, V> produced)` with > > `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`). > > > > so I assume the "Joined.with(" argument didn't work, and if I add this > > property: > > > > streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > > ASerdes.class); > > > > I get a "NullPointerException" error, from RecordCollectorImpl.java, > > where valueSerializer is null: > > > > @Override > > public <K, V> void send(final String topic, > > final K key, > > final V value, > > final Headers headers, > > final Integer partition, > > final Long timestamp, > > final Serializer<K> keySerializer, > > final Serializer<V> valueSerializer) { > > checkForException(); > > > > final byte[] keyBytes; > > final byte[] valBytes; > > try { > > keyBytes = keySerializer.serialize(topic, headers, key); > > valBytes = valueSerializer.serialize(topic, headers, value); > > ^----- NullPointerException > > > > > > I also figured out a way to solve this by sending the Stream to a > > topic right after the "map" operator for rekeying, > > and then loading that new topic into a new Stream, and using that for > > applying the left join. > > That trick worked, but I'm pretty sure there is a more elegant way to do > > this. > > > > Thanks folks, any help is really appreciated. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- Richard Rossel Atlanta - GA
