[
https://issues.apache.org/jira/browse/KAFKA-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044035#comment-17044035
]
Guozhang Wang commented on KAFKA-9598:
--------------------------------------
I think it maybe related to some old bugs that we fixed in trunk, [~sergem]
could you compile the latest trunk and run to see if it works?
> RocksDB exception when grouping dynamically appearing topics into a KTable
> ---------------------------------------------------------------------------
>
> Key: KAFKA-9598
> URL: https://issues.apache.org/jira/browse/KAFKA-9598
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.2.0, 2.4.0
> Reporter: Sergey Menshikov
> Priority: Major
> Attachments: exception-details.txt
>
>
> A streams application consumes a number of topics via a whitelisted regex.
> The topics appear dynamically, generated from dynamically appearing MongoDB
> collections by debezium MongoDB source driver.
> The development is running on debezium docker images (Debezium 0.9 and
> Debezium 1.0 -> Kafka 2.2.0 and 2.4.0), single instance of Kafka, Connect and
> the streams consumer app.
> As the MongoDB driver provides only deltas of the changes, to collect full
> record for each key, the code creates KTable which is then transformed into a
> KStream for further joining with other KTables and Global KTables.
> The following piece of code results in the exception when a new topic is
> added:
>
> {code:java}
> Pattern tResultPattern =
>
> Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
> KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern,
> Consumed.with(stringSerde, jsonSerde));
> KTable<String, JsonNode> tResultsTempTable =
> tResultsTempStream.groupByKey(Grouped.with(stringSerde,jsonSerde))
> .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue)); //
> mergeNodes is a Json traverse/merger procedure
> KStream<String, JsonNode> tResults =
> tResultsTempTable.toStream();
>
> {code}
> kconsumer_1 | Exception in thread "split-reader-client3-StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException: Error opening store
> KSTREAM-REDUCE-STATE-STORE-0000000032 at location
> /tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032
> ...
> kconsumer_1 | Caused by: org.rocksdb.RocksDBException: lock :
> /tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032/LOCK:
> No locks available
> Kstore 10_0 contains tr[0-9a-fA-F] 32 records, I checked.
>
> more details about exception are in the attached file.
>
> The exception is no longer present when I use an intermediate topic instead:
>
>
> {code:java}
> Pattern tResultPattern =
>
> Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
> KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern,
> Consumed.with(stringSerde, jsonSerde));
>
> tResultsTempStream.transform(trTransformer::new).to(config.getProperty("tr_intermediate_topic_name"),Produced.with(stringSerde,
> jsonSerde)); // trTransformer adds topic name into value Json, in previous
> snippet it was done in the pipeline after grouping/streaming
> KStream<String, JsonNode> tResultsTempStream2 =
> builder.stream(config.getProperty("tr_intermediate_topic_name"),
> Consumed.with(stringSerde, jsonSerde));
> KTable<String, JsonNode> tResultsTempTable =
> tResultsTempStream2.groupByKey(Grouped.with(stringSerde,jsonSerde))
> .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue));
> KStream<String, JsonNode> tResults =
> tResultsTempTable.toStream();
> {code}
>
>
> If making KTable from multiple whitelisted topics is something that is
> outside of scope of Kafka Streams, perhaps it would make sense to mention it
> in the docs.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)