Hi List,
There is a Join operation between a stream that is marked for
repartition and a KTable that I can't make it work.
The same left join works fine when I don't apply the Map operator (for
changing key) so I must be doing something wrong with Serdes.
The topology is:
1) Loading a KTable from topic B <Integer,String> :
final KTable<Integer, B> ktableB = builder
.stream(topicB, Consumed.with(integerSerde, stringSerde))
.mapValues(v -> gson.fromJson(v, B.class)).toTable();
2) Loading a KStream from topic A <Integer,String>
builder
.stream(topicA, Consumed.with(integerSerde, stringSerde))
.mapValues(v -> gson.fromJson(v, A.class))
3) new key on streamA to expose the field with reference to B Ktable
.map((k,v)-> new KeyValue<Integer, A>(v.refB, v))
4) now that stream and table share same ID, I do a left join
.leftJoin(signalKtable, joiner, Joined.with(
Serdes.Integer(),
serdesA, //serder for Value object in KStream
serdesB)) // serdes for Value object in KTable
5) deserialized value to json and send to output
.mapValues(v->v.toString())
.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), stringSerde));
First error I got:
org.apache.kafka.streams.errors.StreamsException: ClassCastException
while producing data to topic
simple-test-KSTREAM-MAP-0000000006-repartition. A serializer (key:
org.apache.kafka.common.serialization.IntegerSerializer / value:
org.apache.kafka.common.serialization.ByteArraySerializer) is not
compatible to the actual key or value type (key type:
java.lang.Integer / value type: com.zzzzzz.events.model.A). Change the
default Serdes in StreamConfig or provide correct Serdes via method
parameters (for example if using the DSL, `#to(String topic,
Produced<K, V> produced)` with
`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
so I assume the "Joined.with(" argument didn't work, and if I add this property:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
ASerdes.class);
I get a "NullPointerException" error, from RecordCollectorImpl.java,
where valueSerializer is null:
@Override
public <K, V> void send(final String topic,
final K key,
final V value,
final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
checkForException();
final byte[] keyBytes;
final byte[] valBytes;
try {
keyBytes = keySerializer.serialize(topic, headers, key);
valBytes = valueSerializer.serialize(topic, headers, value);
^----- NullPointerException
I also figured out a way to solve this by sending the Stream to a
topic right after the "map" operator for rekeying,
and then loading that new topic into a new Stream, and using that for
applying the left join.
That trick worked, but I'm pretty sure there is a more elegant way to do this.
Thanks folks, any help is really appreciated.
--
Richard Rossel
Atlanta - GA