[ https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matej Sprysl updated KAFKA-16432: --------------------------------- Description: h2. Code: {code:java} final StreamsBuilder builder = new StreamsBuilder(); final GlobalKTable<Long, Long> table = builder.globalTable("tableTopic", Consumed.with(Serdes.Long(), Serdes.Long())); final StreamsBuilder builder2 = new StreamsBuilder(); final KStream<byte[], Message> stream = builder2.stream( Pattern.compile("streamTopic"), Consumed.with(Serdes.ByteArray(), Message.SERDE)); (...some processing, no state store added...) final var joiner = new MyJoiner(); final var keyMapper = new MyKeyMapper(); final KStream<byte[], Message> enriched = messages .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} h2. Error: {code:java} Caused by: org.apache.kafka.streams.errors.StreamsException: Processor innerJoin has no access to StateStore tableTopic-STATE-STORE-0000000000 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA. {code} h2. Additional notes: This error happens when I try to join a KStreams instance with a GlobalKTable instance. It is important to emphasize that I am not connecting any state store manually. was: h2. Code: {code:java} final StreamsBuilder builder = new StreamsBuilder(); final GlobalKTable<Long, Long> table = builder.globalTable("tableTopic", Consumed.with(Serdes.Long(), Serdes.Long())); final KStream<byte[], Message> stream = builder.stream( Pattern.compile("streamTopic"), Consumed.with(Serdes.ByteArray(), Message.SERDE)); (...some processing, no state store added...) final var joiner = new MyJoiner(); final var keyMapper = new MyKeyMapper(); final KStream<byte[], Message> enriched = messages .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} h2. Error: {code:java} Caused by: org.apache.kafka.streams.errors.StreamsException: Processor innerJoin has no access to StateStore tableTopic-STATE-STORE-0000000000 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA. {code} h2. Additional notes: This error happens when I try to join a KStreams instance with a GlobalKTable instance. It is important to emphasize that I am not connecting any state store manually. > KStreams: Joining KStreams and GlobalKTable requires a state store > ------------------------------------------------------------------ > > Key: KAFKA-16432 > URL: https://issues.apache.org/jira/browse/KAFKA-16432 > Project: Kafka > Issue Type: Bug > Reporter: Matej Sprysl > Priority: Major > > h2. Code: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final GlobalKTable<Long, Long> table = builder.globalTable("tableTopic", > Consumed.with(Serdes.Long(), Serdes.Long())); > final StreamsBuilder builder2 = new StreamsBuilder(); > final KStream<byte[], Message> stream = > builder2.stream( > Pattern.compile("streamTopic"), > Consumed.with(Serdes.ByteArray(), Message.SERDE)); > (...some processing, no state store added...) > final var joiner = new MyJoiner(); > final var keyMapper = new MyKeyMapper(); > final KStream<byte[], Message> enriched = > messages > .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} > h2. Error: > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > innerJoin has no access to StateStore tableTopic-STATE-STORE-0000000000 as > the store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the corresponding operator, or they can provide a StoreBuilder by > implementing the stores() method on the Supplier itself. If you do not add > stores manually, please file a bug report at > https://issues.apache.org/jira/projects/KAFKA. > {code} > h2. Additional notes: > This error happens when I try to join a KStreams instance with a GlobalKTable > instance. > It is important to emphasize that I am not connecting any state store > manually. -- This message was sent by Atlassian Jira (v8.20.10#820010)