[ 
https://issues.apache.org/jira/browse/KAFKA-9355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034963#comment-17034963
 ] 

ASF GitHub Bot commented on KAFKA-9355:
---------------------------------------

guozhangwang commented on pull request #7996: KAFKA-9355: Fix bug that removed 
RocksDB metrics after failure in EOS
URL: https://github.com/apache/kafka/pull/7996
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RocksDB statistics are removed from JMX when EOS enabled and empty local 
> state dir
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9355
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9355
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>    Affects Versions: 2.4.0
>            Reporter: Stanislav Savulchik
>            Assignee: Bruno Cadonna
>            Priority: Major
>         Attachments: metric-removal.log
>
>
> *Steps to Reproduce*
> Set processing.guarantee = exactly_once and remove local state dir in order 
> to force state restoration from changelog topics that have to be non empty.
> *Expected Behavior*
> There are registered MBeans like 
> kafka.streams:type=stream-state-metrics,client-id=<application.id>-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,task-id=0_0,rocksdb-state-id=<state.store>
>  for persistent RocksDB KeyValueStore-s after streams task state restoration.
> *Actual Behavior*
> There are no registered MBeans like above after streams task state 
> restoration.
> *Details*
> I managed to inject custom MetricsReporter in order to log metricChange and 
> metricRemoval calls. According to the logs at some point the missing metrics 
> are removed and never restored later. Here is an excerpt for 
> number-open-files metric:
> {noformat}
> 16:33:40.403 DEBUG c.m.r.LoggingMetricsReporter - metricChange MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 INFO  o.a.k.s.p.i.StoreChangelogReader - stream-thread 
> [morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1] No 
> checkpoint found for task 0_0 state store buffered-event changelog 
> morpheus.conversion-buffered-event-changelog-0 with EOS turned on. 
> Reinitializing the task and restore its state from the beginning.
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing metrics recorder for store 
> buffered-event of task 0_0 from metrics recording trigger
> 16:33
> 16:33:40.611 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> ...
> (no more calls to metricChange for the removed number-open-files 
> metric){noformat}
> Also a complete log is attached [^metric-removal.log]
> Metric removal happens along this call stack:
> {noformat}
> 19:27:35.509 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-9b76f302-7149-47de-b17b-362d642e05d5-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> java.lang.Exception: null
>    at 
> casino.morpheus.reporter.LoggingMetricsReporter.metricRemoval(LoggingMetricsReporter.scala:24)
>    at org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:534)
>    at org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:448)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.removeAllStoreLevelSensors(StreamsMetricsImpl.java:440)
>    at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.close(MeteredKeyValueStore.java:345)
>    at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.reinitializeStateStoresForPartitions(StateManagerUtil.java:93)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:190)
>    at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:215)
>    at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:234)
>    at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
>    at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
>    at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671){noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to