Matthias J. Sax created KAFKA-6398:
--------------------------------------

             Summary: Stream-Table join fails, if table is not materialized
                 Key: KAFKA-6398
                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.11.0.1
            Reporter: Matthias J. Sax


Using a non-materialized KTable in a stream-table join fails:

{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(...);
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

fails with
{noformat}
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: StateStore null is not added yet.

        at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
        at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
        at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
{noformat}

Adding a store name is not sufficient as workaround but fails differently:
{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(..., 
"STORE-NAME");
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

error:
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor KSTREAM-JOIN-0000000005

        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
topology building: Processor KSTREAM-JOIN-0000000005 has no access to 
StateStore KTABLE-SOURCE-STATE-STORE-0000000000
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
        at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
        at 
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
        at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
{noformat}

One can workaround by piping the result through a topic:
{noformat}
final KTable filteredKTable = 
builder.table("table-topic").filter(...).through("TOPIC");;
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to