[ https://issues.apache.org/jira/browse/KAFKA-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011426#comment-17011426 ]
Andy Bryant commented on KAFKA-9390: ------------------------------------ Here's the code snipped. I'm joining the orders table to the customers table using the customer key as the foreign key. {code:java} final KTable<dbserver1.inventory.customers.Key, dbserver1.inventory.customers.Value> customersTable = CdcHelpers.valueTable(streams.customers(), "customersTable"); final KTable<dbserver1.inventory.orders.Key, dbserver1.inventory.orders.Value> ordersTable = CdcHelpers.valueTable(streams.orders(), "ordersTable"); final KTable<dbserver1.inventory.orders.Key, OrderView> enrichedOrders = ordersTable.join( customersTable, order -> new dbserver1.inventory.customers.Key(order.getPurchaser()), (order, customer) -> OrderView.newBuilder() .setCustomerName(customer.getFirstName() + ' ' + customer.getLastName()) .setOrderId(order.getOrderNumber()) .setOrderDate(LocalDate.fromDateFields(Date.valueOf(java.time.LocalDate.ofEpochDay(order.getOrderDate())))) .setOrderQuantity(order.getQuantity()) .setProductName("unknown") .setProductWeight(0.0d) .build(), Named.as("wtf"), AvroSerdes.materializedAs("ordersWithCustomerxx")); {code} > Non-key joining of KTable not compatible with confluent avro serdes > ------------------------------------------------------------------- > > Key: KAFKA-9390 > URL: https://issues.apache.org/jira/browse/KAFKA-9390 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: Andy Bryant > Priority: Major > > I was trying out the new one-to-many KTable joins against some CDC data in > Avro format and kept getting serialisation errors. > > {code:java} > org.apache.kafka.common.errors.SerializationException: Error registering Avro > schema: > {"type":"record","name":"Key","namespace":"dbserver1.inventory.orders","fields":[ > {"name":"order_number","type":"int"} > ],"connect.name":"dbserver1.inventory.orders.Key"} > Caused by: > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > Schema being registered is incompatible with an earlier schema; error code: > 409 > > {code} > Both tables have avro keys of different types (one is an order key, the other > a customer key). > This looks like it will cause issues. > [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L57-L60] > They will both attempt to register schemas with the same subject to the > schema registry which will fail a backward compatibility check. > I also noticed in the schema registry there were some subjects that didn't > have the application id prefix. This is probably caused by this... > > [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java#L88] > Where here {{repartitionTopicName}} doesn't have the application prefix. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)