Hi Sachin,

Streams apps can be configured with a rocksdb.config.setter, which is a
class name that needs to implement
the org.apache.kafka.streams.state.RocksDBConfigSetter interface, which can
be used to reduce the memory utilization of RockDB.  Here's an example
class that trims it way down (note that this is Kotlin code, not Java):

package com.replicon.streamProcessing.timesheetList

import org.rocksdb.Options
import org.rocksdb.BlockBasedTableConfig

class RocksDBConfigSetter :
org.apache.kafka.streams.state.RocksDBConfigSetter {
    // Default KS value is 100 megs block cache, 32 megs write buffer, but
that's awfully expensive on memory.  This
    // trims it down to 2 megs per store.

    private val WRITE_BUFFER_SIZE = 1 * 1024 * 1024L
    private val BLOCK_CACHE_SIZE = 1 * 1024 * 1024L
    private val BLOCK_SIZE = 4096L

    override fun setConfig(storeName: String, options: Options, configs:
MutableMap<String, Any>) {
        val tableConfig = BlockBasedTableConfig()
        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE)
        tableConfig.setBlockSize(BLOCK_SIZE)

        options.setTableFormatConfig(tableConfig)
        options.setWriteBufferSize(WRITE_BUFFER_SIZE)
    }
}


Which I then reference in my settings before starting my KafkaStreams
(still Kotlin):

settings.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
RocksDBConfigSetter::class.java)




On Fri, Feb 10, 2017 at 12:13 AM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> We are running rocksdb with default configuration.
> I would try to monitor the rocks db, I do see the beans when I connect via
> jmx client.
>
> We use rocks db for aggregation. Our pipe line is:
>
> input
> .groupByKey()
> .aggregate(new Initializer<SortedSet<T>>() {
>     public SortedSet<T> apply() {
>         return new TreeSet<T>();
>     }
> }, new Aggregator<String, T, SortedSet<T>>() {
>     public SortedSet<T> apply(String key, T value, SortedSet<T> aggregate)
> {
>         aggregate.add(value);
>         return aggregate;
>     }
> }, TimeWindows.of(...).advanceBy(...).until(...), valueSerde, "key-table")
> .mapValues(new ValueMapper<SortedSet<T>, SortedSet<U>>() {
>     public SortedSet<U> apply(SortedSet<T> t) {
>         SortedSet<U> u = new TreeSet<U>();
>         ...
>         return u;
>     }
> })
> .foreach(new ForeachAction<Windowed<String>, SortedSet<U>>() {
>     public void apply(Windowed<String> key, SortedSet<U> nodes) {
>         ...
>     }
> });
>
> So I guess rocksdb is used only in case of aggregate part.
>
> Is there some additional rocksdb setting that you would like us to try.
>
> Thanks
> Sachin
>
>
> On Fri, Feb 10, 2017 at 12:33 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Sachin,
> >
> > Thanks for sharing your observations, that are very helpful.
> >
> > Regards to monitoring, there are indeed metrics inside the Streams
> library
> > to meter state store operations; for rocksdb it records the average
> latency
> > and callrate for put / get / delete / all / range and flush / restore.
> You
> > can find the corresponding metric names in the monitoring section on web
> > docs once 0.10.2 is out:
> >
> > https://kafka.apache.org/documentation/#monitoring
> >
> > Or you can just grep all metric names and see which ones are of interests
> > to you.
> >
> >
> > Regards to the observations, I'm particularly interested in the fact that
> > "memory utilization keeps increasing". But since rocksdb uses both
> on-heap
> > and off heap memory it is hard to tell if there is really a memory leak.
> > Could you share your usage of the state stores, like are they used for
> > aggregations / joins / else, and did you override any of the rocksdb
> config
> > settings?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Feb 9, 2017 at 8:58 PM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > Hi,
> > > We recently upgraded to 0.10.2.0-rc0, the rocksdb issue seems to be
> > > resolved however we are just not able to get the streams going under
> our
> > > current scenario.
> > > The issue seems to be still related to rocksdb.
> > > Let me explain the situation:
> > >
> > > We have 40 partitions and 12 threads distributes across three machines
> > > (each having 4).
> > > Roughly 15 partition gets assigned to each machine. Thus we have 15
> state
> > > stores on each machine.
> > >
> > > What we see is that streams frequently gets rebalanced due to
> > > CommitFailedException and as you have said this should not happen once
> > > application reaches steady state.
> > > We are running an instance on 4 core linux box (virtual machine). What
> we
> > > observe is that there is lot of waiting time for each core consistently
> > > greater than 50%. What we suspect is that application is spending lot
> of
> > > time on disk I/O this CPU keeps waiting. Also we observe that rate of
> > > consumption from source topic falls over time. Also overtime we see
> that
> > > CPU utilization falls and memory utilization increases.
> > >
> > > Then we we did was used 12 partition for same topic and 12 threads, so
> > each
> > > thread processes from single partition and 4 state stores get created
> per
> > > machine. (note streams application code is identical to above
> > > configuration).
> > > Here what we have observed so far is a stable configuration. After a
> > stead
> > > state no rebalance has same thread continues processing from same
> > > partition.
> > > Also we see that per core wait time fairly stable around 10%.
> > >
> > > Finally we have another configuration where a same single thread
> streams
> > > application reading from identical single partition topic. Here also
> > > application never fails. This is most consistent configuration running
> > from
> > > months now. The per core wait time is usually 0 - 3%.
> > >
> > > So over these experiments we feel that it is rocksdb that may be
> causing
> > > the state of streams application deteriorate over time. Is there any
> > rocks
> > > db metrics/logs we collect that we can watch and infer anything.
> > > I doubt there is any memory leak (as some have reported) because single
> > > thread application seems to be running forever.
> > > However is there any tuning that anyone can recommend which will make
> > > rocksdb more optimized.
> > >
> > > Has anyone else facing similar issues when there are many rocksdb local
> > > state store on a machine then application is not performing as well as
> > > single state store.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Feb 7, 2017 at 3:21 PM, Damian Guy <damian....@gmail.com>
> wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Sorry i misunderstood what you had said. You are running 3 instances,
> > one
> > > > per machine? I thought you said you were running 3 instances on each
> > > > machine.
> > > >
> > > > Regarding partitions: you are better off having more partitions as
> this
> > > > effects the maximum degree of parallelism you can achieve in the app.
> > If
> > > > you only have 12 partitions then you can only have at most 12 threads
> > in
> > > > total. If you have 40 partitions then you can have up to 40 threads
> and
> > > so
> > > > on. It won't help with rebalancing anyway.
> > > >
> > > > You are correct, once all the instances are up and running then
> > > rebalances
> > > > generally shouldn't happen unless there is a failure of some kind. To
> > > > maintain membership of the consumer group the consumer used by
> streams
> > > > needs to poll at least every max.poll.interval.ms - the default for
> > this
> > > > is
> > > > 5 minutes. If for any reason, the processing of records returned
> from a
> > > > call to poll takes longer than  max.poll.interval.ms then the
> consumer
> > > > will
> > > > be kicked out of the group and a rebalance will happen. If you are
> > seeing
> > > > rebalances occurring after everything has been up and running and
> > > reached a
> > > > steady state, you might want to increase the value of
> > > max.poll.interval.ms.
> > > > You
> > > > can also adjust  max.poll.records to decrease the maximum number of
> > > records
> > > > retrieved by a single call to poll - this will help to reduce the
> > > > processing time.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Tue, 7 Feb 2017 at 07:24 Sachin Mittal <sjmit...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > > Everything is understood and I will try out 0.10.2.0-rc0 shortly.
> > > > >
> > > > > However one this is not clear:
> > > > > Firstly i'd recommend you have different state directory configs
> for
> > > each
> > > > > application instance.
> > > > >
> > > > > Well I am running three separate instance of 4 threads each on
> three
> > > > > different machines.
> > > > > So each machine has its own physical structure though path to the
> > > states
> > > > > dir is same for each, because that is the relative path where we
> have
> > > > > mounted the data directory separately for each of these three
> > machine.
> > > > >
> > > > > So my streams state.dir setting is identical for all the instances,
> > but
> > > > > physically there are located at different locations.
> > > > > So why do I need to have different config for each is not clear.
> > > > >
> > > > > I will also test with CLEANUP_DELAY_MS_CONFIG to be 30 minutes.
> > > > >
> > > > > Also one thing I wanted to know if I make partitions equal to total
> > > > streams
> > > > > threads which is 12, will that help in one thread always reading
> > from a
> > > > > single partition, and never a need to re-balance.
> > > > >
> > > > > I however don't understand one thing that once a steady state has
> > > reached
> > > > > and all threads have picked up their partitions then why there is
> > ever
> > > a
> > > > > need to do future re-balance, unless untill a thread dies.
> > > > >
> > > > > Like this is not clear that how often is re-balance is triggered
> and
> > is
> > > > > there a way we can control it.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 6, 2017 at 10:10 PM, Damian Guy <damian....@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Sachin,
> > > > > >
> > > > > > Firstly i'd recommend you have different state directory configs
> > for
> > > > each
> > > > > > application instance. I suspect you are potentially hitting an
> > issue
> > > > > where
> > > > > > the partition assignment has changed, the state directory locks
> get
> > > > > > released, and i directory gets removed just before the lock is
> > taken
> > > > out
> > > > > by
> > > > > > another thread or process. Though it is pretty hard to tell from
> > the
> > > > > above
> > > > > > logs. You might also want to set StreamsConfig.CLEANUP_DELAY_
> > > MS_CONFIG
> > > > to
> > > > > > a
> > > > > > higher value than the default of 60 seconds. There is no reason
> why
> > > it
> > > > > > couldn't be much higher, 30 minutes, maybe more, depends on how
> > badly
> > > > you
> > > > > > need old task data cleaned up. Setting this value higher will
> > reduce
> > > > the
> > > > > > likelihood of the exception occurring.
> > > > > >
> > > > > > 0.10.2 will be out shortly. It would be good if you can try that.
> > You
> > > > can
> > > > > > find the current RC here: http://home.apache.org/~
> > > > > > ewencp/kafka-0.10.2.0-rc0/
> > > > > > You don't need to upgrade your kafka brokers, just the streams
> > > client.
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > > > On Mon, 6 Feb 2017 at 10:53 Sachin Mittal <sjmit...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > Yes on first we have three machines with same data directory
> > > setting.
> > > > > > > So the state dir config is same in for each.
> > > > > > >
> > > > > > > If it helps this is the sequence of logs just before the thread
> > > > > shutting
> > > > > > > down
> > > > > > >
> > > > > > > ....
> > > > > > >
> > > > > > > stream-thread [StreamThread-3] Committing all tasks because the
> > > > commit
> > > > > > > interval 30000ms has elapsed
> > > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > > stream-thread [StreamThread-3] Committing task 0_27
> > > > > > > stream-thread [StreamThread-3] Committing task 0_12
> > > > > > >
> > > > > > > stream-thread [StreamThread-3] Committing all tasks because the
> > > > commit
> > > > > > > interval 30000ms has elapsed
> > > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > > stream-thread [StreamThread-3] Committing task 0_27
> > > > > > > stream-thread [StreamThread-3] Committing task 0_12
> > > > > > >
> > > > > > > stream-thread [StreamThread-3] Committing all tasks because the
> > > > commit
> > > > > > > interval 30000ms has elapsed
> > > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38
> > > > state:
> > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task
> > > [0_38]
> > > > > > Failed
> > > > > > > to flush state store key-table
> > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > >
> > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > > table/key-table-201702052100/000009.sst:
> > > > > > > No such file or directory
> > > > > > > stream-thread [StreamThread-3] Shutting down
> > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> > task
> > > > 0_35
> > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> > task
> > > > 0_38
> > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> > task
> > > > 0_27
> > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> > task
> > > > 0_12
> > > > > > > stream-thread [StreamThread-3] Closing a task 0_35
> > > > > > > stream-thread [StreamThread-3] Closing a task 0_38
> > > > > > > stream-thread [StreamThread-3] Closing a task 0_27
> > > > > > > stream-thread [StreamThread-3] Closing a task 0_12
> > > > > > > stream-thread [StreamThread-3] Flushing state stores of task
> 0_35
> > > > > > > stream-thread [StreamThread-3] Flushing state stores of task
> 0_38
> > > > > > > stream-thread [StreamThread-3] Failed while executing
> StreamTask
> > > 0_38
> > > > > > duet
> > > > > > > to flush state
> > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task
> > > [0_38]
> > > > > > Failed
> > > > > > > to flush state store key-table
> > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > >
> > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > > table/key-table-201702052100/000009.sst:
> > > > > > > No such file or directory
> > > > > > > stream-thread [StreamThread-3] Flushing state stores of task
> 0_27
> > > > > > > stream-thread [StreamThread-3] Flushing state stores of task
> 0_12
> > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> task
> > > 0_35
> > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> task
> > > 0_38
> > > > > > > stream-thread [StreamThread-3] Failed while executing
> StreamTask
> > > 0_38
> > > > > > duet
> > > > > > > to close state manager:
> > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task
> > > [0_38]
> > > > > > Failed
> > > > > > > to close state store key-table
> > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > >
> > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > > table/key-table-201702052100/000009.sst:
> > > > > > > No such file or directory
> > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> task
> > > 0_27
> > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> task
> > > 0_12
> > > > > > > Closing the Kafka producer with timeoutMillis =
> > 9223372036854775807
> > > > ms.
> > > > > > > stream-thread [StreamThread-3] Removing all active tasks
> [[0_35,
> > > > 0_38,
> > > > > > > 0_27, 0_12]]
> > > > > > > stream-thread [StreamThread-3] Removing all standby tasks [[]]
> > > > > > > stream-thread [StreamThread-3] Stream thread shutdown complete
> > > > > > >
> > > > > > > It was working for iterations before that and then suddenly
> that
> > > > > dir/file
> > > > > > > was gone and it could not commit/flush/close the state.
> > > > > > >
> > > > > > >
> > > > > > > For second, what do you recommend. Is there something we can
> > patch
> > > > from
> > > > > > > 10.2 into 10.1.
> > > > > > > Certain commits or something.
> > > > > > >
> > > > > > > Or do we again need to upgrade the cluster or 10.2, or if we
> just
> > > > > upgrade
> > > > > > > streams client to 10.2 will it work fine?
> > > > > > > Since 10.2 is not released yet I suppose we would have build
> the
> > > > > snapshot
> > > > > > > version.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <
> damian....@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Sachin,
> > > > > > > >
> > > > > > > > The first exception - Is each instance of your streams app
> on a
> > > > > single
> > > > > > > > machine running with the same state directory config?
> > > > > > > > The second exception - i believe is a bug in 0.10.1 that has
> > been
> > > > > fixed
> > > > > > > in
> > > > > > > > 0.10.2. There has been a number of issues fixed in this area.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Damian
> > > > > > > >
> > > > > > > > On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <
> sjmit...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello All,
> > > > > > > > > We recently upgraded to  kafka_2.10-0.10.1.1.
> > > > > > > > >
> > > > > > > > > We have a source topic with replication = 3 and partition =
> > 40.
> > > > > > > > > We have a streams application run with
> > > NUM_STREAM_THREADS_CONFIG
> > > > =
> > > > > 4
> > > > > > > and
> > > > > > > > on
> > > > > > > > > three machines. So 12 threads in total.
> > > > > > > > >
> > > > > > > > > What we do is start the same streams application one by one
> > on
> > > > > three
> > > > > > > > > machines.
> > > > > > > > >
> > > > > > > > > After some time what we noticed was that one of the machine
> > > > streams
> > > > > > > > > application just crashed. When we inspected the log here is
> > > what
> > > > we
> > > > > > > > found.
> > > > > > > > > There were 2 sets of errors like:
> > > > > > > > >
> > > > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask
> > 0_38
> > > > > > state:
> > > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> > task
> > > > > [0_38]
> > > > > > > > Failed
> > > > > > > > > to flush state store key-table
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:331)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamTask.commit(StreamTask.java:267)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> StreamThread.commitOne(
> > > > > > > > StreamThread.java:576)
> > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> StreamThread.commitAll(
> > > > > > > > StreamThread.java:562)
> > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.maybeCommit(
> > > > > > > > StreamThread.java:538)
> > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.runLoop(
> > > > > > > > StreamThread.java:456)
> > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > Caused by: org.apache.kafka.streams.errors.
> > > > ProcessorStateException:
> > > > > > > > Error
> > > > > > > > > while executing flush from store key-table-201702052100
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > flushInternal(RocksDBStore.java:375)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > flush(RocksDBStore.java:366)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > RocksDBWindowStore.flush(
> > > > > > > > RocksDBWindowStore.java:256)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > MeteredWindowStore.flush(
> > > > > > > > MeteredWindowStore.java:116)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > CachingWindowStore.flush(
> > > > > > > > CachingWindowStore.java:119)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:329)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     ... 6 common frames omitted
> > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > > > >
> > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > > > > > 201702052100/000009.sst:
> > > > > > > > > No such file or directory
> > > > > > > > >     at org.rocksdb.RocksDB.flush(Native Method)
> > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > >     at org.rocksdb.RocksDB.flush(RocksDB.java:1360)
> > > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > flushInternal(RocksDBStore.java:373)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     ... 11 common frames omitted
> > > > > > > > >
> > > > > > > > > When we queried the directory
> > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > 201702052100/
> > > > > > we
> > > > > > > > > could not find the directory.
> > > > > > > > > So looks like this directory was earlier deleted by some
> task
> > > and
> > > > > now
> > > > > > > > some
> > > > > > > > > other task is trying to flush it too.
> > > > > > > > > What could be possible the reason for the same?
> > > > > > > > >
> > > > > > > > > Another set of error we see is this:
> > > > > > > > > org.apache.kafka.streams.errors.StreamsException:
> > > stream-thread
> > > > > > > > > [StreamThread-1] Failed to rebalance
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.runLoop(
> > > > > > > > StreamThread.java:410)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > Caused by: org.apache.kafka.streams.errors.
> > > > ProcessorStateException:
> > > > > > > > Error
> > > > > > > > > opening store key-table-201702052000 at location
> > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > 201702052000
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > openDB(RocksDBStore.java:190)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > openDB(RocksDBStore.java:159)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > >
> > > > > org.apache.kafka.streams.state.internals.
> RocksDBWindowStore$Segment.
> > > > > > > > openDB(RocksDBWindowStore.java:72)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> RocksDBWindowStore.
> > > > > > > > getOrCreateSegment(RocksDBWindowStore.java:388)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > putInternal(
> > > > > > > > RocksDBWindowStore.java:319)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > RocksDBWindowStore.access$000(
> > > > > > > > RocksDBWindowStore.java:51)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > RocksDBWindowStore$1.restore(
> > > > > > > > RocksDBWindowStore.java:206)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorStateManager.
> > > > > > > > restoreActiveState(ProcessorStateManager.java:235)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorStateManager.
> > > > > > > > register(ProcessorStateManager.java:198)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > RocksDBWindowStore.init(
> > > > > > > > RocksDBWindowStore.java:200)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > MeteredWindowStore.init(
> > > > > > > > MeteredWindowStore.java:66)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > CachingWindowStore.init(
> > > > > > > > CachingWindowStore.java:64)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > > > > > > initializeStateStores(AbstractTask.java:81)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamTask.<init>(StreamTask.java:119)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.access$100(
> > > > > > > > StreamThread.java:69)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> StreamThread$1.
> > > > > > > > onPartitionsAssigned(StreamThread.java:124)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > ConsumerCoordinator.
> > > > > > > > onJoinComplete(ConsumerCoordinator.java:228)
> > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > AbstractCoordinator.
> > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > AbstractCoordinator.
> > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > > > > ConsumerCoordinator.poll(
> > > > > > > > ConsumerCoordinator.java:259)
> > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > > > pollOnce(KafkaConsumer.java:1013)
> > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > > > > KafkaConsumer.java:979)
> > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.runLoop(
> > > > > > > > StreamThread.java:407)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     ... 1 common frames omitted
> > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: lock
> > > > > > > > >
> > > > > > >
> > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > 201702052000/LOCK:
> > > > > > > > No
> > > > > > > > > locks available
> > > > > > > > >     at org.rocksdb.RocksDB.open(Native Method)
> > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > >     at org.rocksdb.RocksDB.open(RocksDB.java:184)
> > > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > >     at
> > > > > > > > >
> > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > openDB(RocksDBStore.java:183)
> > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > >     ... 26 common frames omitted
> > > > > > > > >
> > > > > > > > > And then the whole application shuts down. We have not
> > > understood
> > > > > > this
> > > > > > > > part
> > > > > > > > > of the error and if this second error is somehow related to
> > > first
> > > > > > > error.
> > > > > > > > >
> > > > > > > > > Please let us know what could be the cause of these errors
> or
> > > if
> > > > > they
> > > > > > > > have
> > > > > > > > > been fixed and if there is some way to fix this in current
> > > > release.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > > Sachin
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to