[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032005#comment-17032005
 ] 

John Roesler commented on KAFKA-9517:
-------------------------------------

Aha! I think I've found it:

{noformat}
    java.lang.NullPointerException
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde.<init>(SubscriptionWrapperSerde.java:31)
        at 
org.apache.kafka.streams.kstream.internals.KTableImpl.doJoinOnForeignKey(KTableImpl.java:956)
        at 
org.apache.kafka.streams.kstream.internals.KTableImpl.join(KTableImpl.java:845)
        at 
org.apache.kafka.streams.TopologyTestDriverTest.shouldProduceOutputsInTheRightOrder(TopologyTestDriverTest.java:1533)
{noformat}

produced with the following test (that I'm working on for KAFKA-9503):

{noformat}
    public void shouldProduceOutputsInTheRightOrder() {
        final StreamsBuilder builder = new StreamsBuilder();
        final KTable<String, String> aTable = builder.table("A");
        final KTable<String, String> bTable = builder.table("B");

        final KTable<String, String> fkJoinResult = aTable.join(
            bTable,
            value -> value.split("-")[0],
            (aVal, bVal) -> "(" + aVal + "," + bVal + ")"
        );

        final KTable<String, String> finalJoinResult = aTable.join(
            fkJoinResult,
            (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
        );

        finalJoinResult.toStream().to("output");

        System.out.println(builder.build().describe());

        final Properties config = new Properties();
        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
        try (final TopologyTestDriver topologyTestDriver = new 
TopologyTestDriver(builder.build(), config)) {
            final TestInputTopic<String, String> aTopic = 
topologyTestDriver.createInputTopic("A", new StringSerializer(), new 
StringSerializer());
            final TestInputTopic<String, String> bTopic = 
topologyTestDriver.createInputTopic("B", new StringSerializer(), new 
StringSerializer());
            final TestOutputTopic<String, String> output = 
topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new 
StringDeserializer());
            aTopic.pipeInput("a1", "b1-alpha");
            aTopic.pipeInput("b1", "beta");
            System.out.println(output.readKeyValuesToMap());
        }
    }
{noformat}



> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9517
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9517
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Paul Snively
>            Priority: Critical
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to