[ https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amit Chauhan updated KAFKA-10049: --------------------------------- Description: I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but facing issue while running the code. {code:java} public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass()); props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder = new StreamsBuilder(); KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora); KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data); KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() { @Override public EnrichedOrder apply(OrderObject order, StockMarketData stock) { EnrichedOrder enOrder = EnrichedOrder.builder() .orderId(order.getOrderId()) .execPrice(order.getPrice()) .symbol(order.getSymbol()) .quanity(order.getQuanity()) .side(order.getSide()) .filledQty(order.getFilledQty()) .leaveQty(order.getLeaveQty()) .index(order.getIndex()) .vWaprelative(order.getVWaprelative()) .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0) .stockBid(stock!=null?stock.getBid().doubleValue():0.0) .stockLast(stock!=null?stock.getLast().doubleValue():0.0) .stockClose(stock!=null?stock.getClose().doubleValue():0.0) .build(); return enOrder; } } , Materialized.with(Serdes.String(), new JSONSerdeComp<>())); enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{ @Override public void apply(String arg0, EnrichedOrder arg1) { logger.info(String.format("key = %s, value = %s", arg0, arg1)); } }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close())); }}} <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency>}} {code} was: {code} {{public static void main(String[] args) \{ Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass()); props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder = new StreamsBuilder(); KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora); KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data); KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() { @Override public EnrichedOrder apply(OrderObject order, StockMarketData stock) { EnrichedOrder enOrder = EnrichedOrder.builder() .orderId(order.getOrderId()) .execPrice(order.getPrice()) .symbol(order.getSymbol()) .quanity(order.getQuanity()) .side(order.getSide()) .filledQty(order.getFilledQty()) .leaveQty(order.getLeaveQty()) .index(order.getIndex()) .vWaprelative(order.getVWaprelative()) .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0) .stockBid(stock!=null?stock.getBid().doubleValue():0.0) .stockLast(stock!=null?stock.getLast().doubleValue():0.0) .stockClose(stock!=null?stock.getClose().doubleValue():0.0) .build(); return enOrder; } } , Materialized.with(Serdes.String(), new JSONSerdeComp<>())); enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{ @Override public void apply(String arg0, EnrichedOrder arg1) { logger.info(String.format("key = %s, value = %s", arg0, arg1)); } }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close())); }}} {{}} {{<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency>}}{{}} {code} Summary: KTable-KTable Foreign Key join throwing Serialization Exception (was: KTable-KTable Foreign ) > KTable-KTable Foreign Key join throwing Serialization Exception > ---------------------------------------------------------------- > > Key: KAFKA-10049 > URL: https://issues.apache.org/jira/browse/KAFKA-10049 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Reporter: Amit Chauhan > Priority: Blocker > > I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but > facing issue while running the code. > {code:java} > > public static void main(String[] args) { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application-2"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new > JSONSerdeComp<>().getClass()); > props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > StreamsBuilder builder = new StreamsBuilder(); > KTable<String, OrderObject> ordersTable = builder.<String, > OrderObject>table(TOPIC_Agora); > KTable<String, StockMarketData> stockTable = builder.<String, > StockMarketData>table(TOPIC_Stock_Data); > KTable<String, EnrichedOrder> enriched = > ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new > ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() { > @Override > public EnrichedOrder apply(OrderObject order, StockMarketData > stock) { > EnrichedOrder enOrder = EnrichedOrder.builder() > .orderId(order.getOrderId()) > .execPrice(order.getPrice()) > .symbol(order.getSymbol()) > .quanity(order.getQuanity()) > .side(order.getSide()) > .filledQty(order.getFilledQty()) > .leaveQty(order.getLeaveQty()) > .index(order.getIndex()) > .vWaprelative(order.getVWaprelative()) > > .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0) > > .stockBid(stock!=null?stock.getBid().doubleValue():0.0) > > .stockLast(stock!=null?stock.getLast().doubleValue():0.0) > > .stockClose(stock!=null?stock.getClose().doubleValue():0.0) > .build(); > return enOrder; > } > } , Materialized.with(Serdes.String(), new JSONSerdeComp<>())); > enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{ > @Override > public void apply(String arg0, EnrichedOrder arg1) { > logger.info(String.format("key = %s, value = %s", arg0, arg1)); > } > }); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > streams.start(); > Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close())); > }}} > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>2.5.0</version> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-streams</artifactId> > <version>2.5.0</version> > </dependency>}} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)