[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032005#comment-17032005 ]
John Roesler commented on KAFKA-9517: ------------------------------------- Aha! I think I've found it: {noformat} java.lang.NullPointerException at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde.<init>(SubscriptionWrapperSerde.java:31) at org.apache.kafka.streams.kstream.internals.KTableImpl.doJoinOnForeignKey(KTableImpl.java:956) at org.apache.kafka.streams.kstream.internals.KTableImpl.join(KTableImpl.java:845) at org.apache.kafka.streams.TopologyTestDriverTest.shouldProduceOutputsInTheRightOrder(TopologyTestDriverTest.java:1533) {noformat} produced with the following test (that I'm working on for KAFKA-9503): {noformat} public void shouldProduceOutputsInTheRightOrder() { final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, String> aTable = builder.table("A"); final KTable<String, String> bTable = builder.table("B"); final KTable<String, String> fkJoinResult = aTable.join( bTable, value -> value.split("-")[0], (aVal, bVal) -> "(" + aVal + "," + bVal + ")" ); final KTable<String, String> finalJoinResult = aTable.join( fkJoinResult, (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")" ); finalJoinResult.toStream().to("output"); System.out.println(builder.build().describe()); final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) { final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); aTopic.pipeInput("a1", "b1-alpha"); aTopic.pipeInput("b1", "beta"); System.out.println(output.readKeyValuesToMap()); } } {noformat} > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > ---------------------------------------------------------------------------------- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: Paul Snively > Priority: Critical > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)