Matthias J. Sax created KAFKA-6398: -------------------------------------- Summary: Stream-Table join fails, if table is not materialized Key: KAFKA-6398 URL: https://issues.apache.org/jira/browse/KAFKA-6398 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.1 Reporter: Matthias J. Sax
Using a non-materialized KTable in a stream-table join fails: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...); builder.stream("stream-topic").join(filteredKTable,...); {noformat} fails with {noformat} org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet. at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) {noformat} Adding a store name is not sufficient as workaround but fails differently: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME"); builder.stream("stream-topic").join(filteredKTable,...); {noformat} error: {noformat} org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) {noformat} One can workaround by piping the result through a topic: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");; builder.stream("stream-topic").join(filteredKTable,...); {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)