[ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119771#comment-17119771
 ] 

John Roesler commented on KAFKA-10049:
--------------------------------------

Woah, thanks for digging into that, [~abellemare] !

That's 100% the bug, and it is a regression. It seems I'm doomed to be haunted 
forever by my own code. This bug probably seems familiar because there have 
been like half a dozen bugs around the serdes for this feature. I don't think 
anyone could have predicted that the serdes would wind up to be the most 
complex part of foreign-key joins.

The workflow is very fuzzy, because the "default serde" thing makes everything 
confusing. Hopefully we can fix it sometime soon.

In the mean time, I think the safest thing to do is just let the wrapper serde 
itself decide whether it should wrap the key or value serde, since it happens 
to always be fixed by the type of the wrapper serde.

What do you think about: [https://github.com/apache/kafka/pull/8756] ?

Thanks for your help,

-John

> KTable-KTable Foreign Key join throwing Serialization Exception 
> ----------------------------------------------------------------
>
>                 Key: KAFKA-10049
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10049
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0, 2.6.0
>            Reporter: Amit Chauhan
>            Assignee: John Roesler
>            Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>      Properties props = new Properties();
>      props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>      props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>      StreamsBuilder builder = new StreamsBuilder();
>      KTable<String, OrderObject> ordersTable = builder.<String, 
> OrderObject>table(TOPIC_Agora);
>      KTable<String, StockMarketData> stockTable = builder.<String, 
> StockMarketData>table(TOPIC_Stock_Data);
>      KTable<String, EnrichedOrder> enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
>             @Override
>             public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
>                 EnrichedOrder enOrder = EnrichedOrder.builder()
>                         .orderId(order.getOrderId())
>                         .execPrice(order.getPrice())
>                         .symbol(order.getSymbol())
>                         .quanity(order.getQuanity())
>                         .side(order.getSide())
>                         .filledQty(order.getFilledQty())
>                         .leaveQty(order.getLeaveQty())
>                         .index(order.getIndex())
>                         .vWaprelative(order.getVWaprelative())
>                         
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
>                         
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
>                         
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
>                         
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
>                         .build();
>                 return enOrder;
>             }
>         } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>      enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
>          @Override
>         public void apply(String arg0, EnrichedOrder arg1) {
>              logger.info(String.format("key = %s, value = %s", arg0, arg1));
>         }
>     });
>      KafkaStreams streams = new KafkaStreams(builder.build(), props);
>      streams.start();
>      Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
>     <dependency>
>         <groupId>org.apache.kafka</groupId>
>         <artifactId>kafka-clients</artifactId>
>         <version>2.5.0</version>
>     </dependency>
>     <dependency>
>         <groupId>org.apache.kafka</groupId>
>         <artifactId>kafka-streams</artifactId>
>         <version>2.5.0</version>
>     </dependency>
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-0000000000: 
>     org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced<K, V> produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [kafka-streams-2.5.0.jar:?]
>     Caused by: java.lang.ClassCastException: java.lang.String cannot be cast 
> to com.messages.JSONSerdeCompatible
>         at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1) 
> ~[classes/:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
>  ~[kafka-clients-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
>  ~[kafka-streams-2.5.0.jar:?]
>         ... 34 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to