[ 
https://issues.apache.org/jira/browse/KAFKA-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044035#comment-17044035
 ] 

Guozhang Wang commented on KAFKA-9598:
--------------------------------------

I think it maybe related to some old bugs that we fixed in trunk, [~sergem] 
could you compile the latest trunk and run to see if it works?

> RocksDB exception when grouping dynamically appearing topics into a KTable 
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-9598
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9598
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.2.0, 2.4.0
>            Reporter: Sergey Menshikov
>            Priority: Major
>         Attachments: exception-details.txt
>
>
> A streams application consumes a number of topics via a whitelisted regex. 
> The topics appear dynamically, generated from dynamically appearing MongoDB 
> collections by debezium MongoDB source driver.
> The development is running on debezium docker images (Debezium 0.9 and 
> Debezium 1.0 -> Kafka 2.2.0 and 2.4.0), single instance of Kafka, Connect and 
> the streams consumer app.
> As the MongoDB driver provides only deltas of the changes, to collect full 
> record for each key, the code creates KTable which is then transformed into a 
> KStream for further joining with other KTables and Global KTables.
> The following piece of code results in the exception when a new topic is 
> added:
>  
> {code:java}
> Pattern tResultPattern =
>  
> Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
> KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern, 
> Consumed.with(stringSerde, jsonSerde));
>  KTable<String, JsonNode> tResultsTempTable = 
> tResultsTempStream.groupByKey(Grouped.with(stringSerde,jsonSerde))
>  .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue)); // 
> mergeNodes is a Json traverse/merger procedure
> KStream<String, JsonNode> tResults =
>  tResultsTempTable.toStream();
>  
> {code}
> kconsumer_1 | Exception in thread "split-reader-client3-StreamThread-1" 
> org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> KSTREAM-REDUCE-STATE-STORE-0000000032 at location 
> /tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032
> ...
> kconsumer_1 | Caused by: org.rocksdb.RocksDBException: lock : 
> /tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032/LOCK: 
> No locks available
> Kstore 10_0 contains tr[0-9a-fA-F] 32 records, I checked.
>  
> more details about exception are in the attached file.
>  
> The exception is no longer present when I use an intermediate topic instead:
>  
>   
> {code:java}
> Pattern tResultPattern =
>  
> Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
> KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern, 
> Consumed.with(stringSerde, jsonSerde));
>  
> tResultsTempStream.transform(trTransformer::new).to(config.getProperty("tr_intermediate_topic_name"),Produced.with(stringSerde,
>  jsonSerde)); // trTransformer adds topic name into value Json, in previous 
> snippet it was done in the pipeline after grouping/streaming
> KStream<String, JsonNode> tResultsTempStream2 = 
> builder.stream(config.getProperty("tr_intermediate_topic_name"), 
> Consumed.with(stringSerde, jsonSerde));
>  KTable<String, JsonNode> tResultsTempTable = 
> tResultsTempStream2.groupByKey(Grouped.with(stringSerde,jsonSerde))
>  .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue));
> KStream<String, JsonNode> tResults =
>  tResultsTempTable.toStream();
> {code}
>  
>  
> If making KTable from multiple whitelisted topics is something that is 
> outside of scope of Kafka Streams, perhaps it would make sense to mention it 
> in the docs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to