Glad you figured it out. If you think the docs should be improved, we take PRs :) -- In the end, it's a broader community effort to have good docs...
-Matthias On 6/11/21 12:10 PM, Richard Rossel wrote: > Thanks Matthias for your help, > I was able to find the issue, it was actually my Serdes definition. It > is very hard to find those > type problems, the execution trace log are not very helpful, and > documentation regarding custom > serdes are not that great. > > Thanks for the tip regarding the need of materializing tables, I had > no idea about the "push down" process. > Do you know where I can find some documentation regarding that process? > > > On Tue, Jun 8, 2021 at 5:54 PM Matthias J. Sax <[email protected]> wrote: >> >> Can you create a ticket? There is at least one bug I found looking into >> the code. (At least in `trunk` -- what version are you using? -- we >> might have fixed a few bugs in newer versions -- I could not reproduce >> all issue you report). >> >> >> Btw: you will actually need to set the value serde via >> >> .toTable(Materilzied.with(null, serdesB) >> >> The table needs to be materialized for the join, and thus, you need to >> specify the serde for `B.class`. >> >> The specified serde via `Joined`, does not apply atm to the upstream >> table materialization -- Kafka Streams does on "push down" serde from >> upstream do downstream nodes, but not (yet) into the other direction. >> >> (`Joined` is also used for stream-stream join and `otherValueSerde` is >> uses there -- for stream-table join, `otherValueSerde` is ignored atm.) >> >> Thus, after mapValue() that changes the value type, we don't have a >> serde at hand and fall back to the config. >> >> >> -Matthias >> >> On 6/7/21 8:02 PM, Richard Rossel wrote: >>> Hi List, >>> There is a Join operation between a stream that is marked for >>> repartition and a KTable that I can't make it work. >>> The same left join works fine when I don't apply the Map operator (for >>> changing key) so I must be doing something wrong with Serdes. >>> >>> The topology is: >>> >>> 1) Loading a KTable from topic B <Integer,String> : >>> >>> final KTable<Integer, B> ktableB = builder >>> .stream(topicB, Consumed.with(integerSerde, stringSerde)) >>> .mapValues(v -> gson.fromJson(v, B.class)).toTable(); >>> >>> 2) Loading a KStream from topic A <Integer,String> >>> >>> builder >>> .stream(topicA, Consumed.with(integerSerde, stringSerde)) >>> .mapValues(v -> gson.fromJson(v, A.class)) >>> >>> 3) new key on streamA to expose the field with reference to B Ktable >>> >>> .map((k,v)-> new KeyValue<Integer, A>(v.refB, v)) >>> >>> 4) now that stream and table share same ID, I do a left join >>> >>> .leftJoin(signalKtable, joiner, Joined.with( >>> Serdes.Integer(), >>> serdesA, //serder for Value object in KStream >>> serdesB)) // serdes for Value object in KTable >>> >>> 5) deserialized value to json and send to output >>> .mapValues(v->v.toString()) >>> .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), stringSerde)); >>> >>> >>> First error I got: >>> >>> org.apache.kafka.streams.errors.StreamsException: ClassCastException >>> while producing data to topic >>> simple-test-KSTREAM-MAP-0000000006-repartition. A serializer (key: >>> org.apache.kafka.common.serialization.IntegerSerializer / value: >>> org.apache.kafka.common.serialization.ByteArraySerializer) is not >>> compatible to the actual key or value type (key type: >>> java.lang.Integer / value type: com.zzzzzz.events.model.A). Change the >>> default Serdes in StreamConfig or provide correct Serdes via method >>> parameters (for example if using the DSL, `#to(String topic, >>> Produced<K, V> produced)` with >>> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`). >>> >>> so I assume the "Joined.with(" argument didn't work, and if I add this >>> property: >>> >>> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, >>> ASerdes.class); >>> >>> I get a "NullPointerException" error, from RecordCollectorImpl.java, >>> where valueSerializer is null: >>> >>> @Override >>> public <K, V> void send(final String topic, >>> final K key, >>> final V value, >>> final Headers headers, >>> final Integer partition, >>> final Long timestamp, >>> final Serializer<K> keySerializer, >>> final Serializer<V> valueSerializer) { >>> checkForException(); >>> >>> final byte[] keyBytes; >>> final byte[] valBytes; >>> try { >>> keyBytes = keySerializer.serialize(topic, headers, key); >>> valBytes = valueSerializer.serialize(topic, headers, value); >>> ^----- NullPointerException >>> >>> >>> I also figured out a way to solve this by sending the Stream to a >>> topic right after the "map" operator for rekeying, >>> and then loading that new topic into a new Stream, and using that for >>> applying the left join. >>> That trick worked, but I'm pretty sure there is a more elegant way to do >>> this. >>> >>> Thanks folks, any help is really appreciated. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> > > >
