[ https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17349475#comment-17349475 ]
Marco Lotz edited comment on KAFKA-5676 at 5/21/21, 9:04 PM: ------------------------------------------------------------- [~mjsax] [~guozhang] I can take over this ticket if nobody minds. I had a look at the code and indeed there's quite a problem of dependency inversion here. Tons of classes using behaviours only available in StreamsMetricsImpl instead of StreamsMetrics. Among the public behaviours not declared in the interface are: {code:java} Version version(); addClientLevelMutableMetric(); and many more{code} An example of calls that directly use the implementation is in ProcessorNodeMetrics#addStateMetric {code:java} public static void addStateMetric(final StreamsMetricsImpl streamsMetrics, final Gauge<State> stateProvider) { streamsMetrics.addClientLevelMutableMetric( <--- This one is only available in the implementation STATE, STATE_DESCRIPTION, RecordingLevel.INFO, stateProvider ); } {code} It seems to me that the interface StreamMetrics is missing many behaviours. I can work on a PR to move those behaviours up to the interface and then we work together from there. Edit: Extra example, even interfaces are returning the implementation: {code:java} public interface InternalProcessorContext { ... @Override StreamsMetricsImpl metrics(); }{code} was (Author: marcolotz): [~mjsax] [~guozhang] I can take over this ticket if nobody minds. I had a look at the code and indeed there's quite a problem of dependency inversion here. Tons of classes using behaviours only available in StreamsMetricsImpl instead of StreamsMetrics. Among the public behaviours not declared in the interface are: {code:java} Version version(); addClientLevelMutableMetric(); and many more{code} An example of calls that directly use the implementation is in ProcessorNodeMetrics#addStateMetric {code:java} public static void addStateMetric(final StreamsMetricsImpl streamsMetrics, final Gauge<State> stateProvider) { streamsMetrics.addClientLevelMutableMetric( <--- This one is only available in the implementation STATE, STATE_DESCRIPTION, RecordingLevel.INFO, stateProvider ); } {code} It seems to me that the interface StreamMetrics is missing many behaviours. I can work on a PR to move those behaviours up to the interface and then we work together from there. > MockStreamsMetrics should be in o.a.k.test > ------------------------------------------ > > Key: KAFKA-5676 > URL: https://issues.apache.org/jira/browse/KAFKA-5676 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Marco Lotz > Priority: Major > Labels: newbie > Time Spent: 96h > Remaining Estimate: 0h > > {{MockStreamsMetrics}}'s package should be `o.a.k.test` not > `o.a.k.streams.processor.internals`. > In addition, it should not require a {{Metrics}} parameter in its constructor > as it is only needed for its extended base class; the right way of mocking > should be implementing {{StreamsMetrics}} with mock behavior than extended a > real implementaion of {{StreamsMetricsImpl}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)