mjsax commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r1976511207
########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -419,8 +485,36 @@ private Topology complexTopology() { return builder.build(); } - private Topology simpleTopology() { + private void addGlobalStore(final StreamsBuilder builder) { + builder.addGlobalStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("iq-test-store"), + Serdes.String(), + Serdes.String() + ), Review Comment: Formatting seems to be off? I would recommend ``` builder.addGlobalStore( Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("iq-test-store"), Serdes.String(), Serdes.String() ), globalStoreTopic, [...] ): ``` ########## streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java: ########## @@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter { private final String stateUpdaterThreadId; - public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final String stateUpdaterThreadId) { + public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final Optional<String> stateUpdaterThreadId) { this.consumer = Objects.requireNonNull(consumer); this.threadId = Objects.requireNonNull(threadId); - this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId); + this.stateUpdaterThreadId = stateUpdaterThreadId.orElse(""); Review Comment: Not sure. Might actually be cleaner to make `this.stateUpdaterThreadId` and `Optional` by itself, and update the code accordingly when it's used? ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -144,6 +157,51 @@ public void tearDown() throws Exception { if (!streamsSecondApplicationProperties.isEmpty()) { IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties); } + if (globalStoreIterator != null) { + globalStoreIterator.close(); + } + } + + @ParameterizedTest + @ValueSource(strings = {"INFO", "DEBUG", "TRACE"}) + public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel) throws Exception { + streamsApplicationProperties = props(true); + streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel); + IntegrationTestUtils.produceKeyValuesSynchronously(globalStoreTopic, Review Comment: Why do we need to produce input data? Seems it's to trigger `process()` to open the iterator? Wondering if we should avoid this, and creating the iterator in `init()` instead? nit: move `globalStoreTopic` to it's own line to make it easier for the eye to read the code ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -419,8 +485,36 @@ private Topology complexTopology() { return builder.build(); } - private Topology simpleTopology() { + private void addGlobalStore(final StreamsBuilder builder) { + builder.addGlobalStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("iq-test-store"), + Serdes.String(), + Serdes.String() + ), + globalStoreTopic, + Consumed.with(Serdes.String(), Serdes.String()), + () -> new Processor<>() { + private KeyValueStore<String, String> store; + + @Override + public void init(final ProcessorContext<Void, Void> context) { + store = context.getStateStore("iq-test-store"); + } + + @Override + public void process(final Record<String, String> record) { + store.put(record.key(), record.value()); + globalStoreIterator = store.all(); Review Comment: Uff... quick a hack. But I guess difficult (impossible) to do differently? We should add a comment about it though, ie, just c&p your PR comment directly (it's a good explanation, and avoids that somebody spends time tryin to "fix" it, just breaking the test and getting confused, and wasting time). We might also want to add a `@SuppressWarning("resource")` to `process(...)` method? IntelliJ should complain about not using try-with-resource for the iterator? ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -525,7 +619,7 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client .stream() .flatMap(rm -> rm.getScopeMetricsList().stream()) .flatMap(sm -> sm.getMetricsList().stream()) - .map(metric -> metric.getGauge()) + .map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge) Review Comment: The change LGTM. Little bit annoying that we have naming "conflict" for `Metric` but using method reference seems better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org