[jira] [Assigned] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics
[ https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mohamed chebbi reassigned KAFKA-10217: -- Assignee: mohamed chebbi > Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to > StreamsMetrics > --- > > Key: KAFKA-10217 > URL: https://issues.apache.org/jira/browse/KAFKA-10217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: mohamed chebbi >Priority: Minor > Labels: kip-required, newbie > > StreamsMetricsImpl contains several convenience methods for safely > registering sensors at different levels of granularity. We added them as > internal methods because we weren't sure of their stability and utility. Now, > they've been in use for quite a while and seem to be stable. > We should move them up into the public API so that custom stores and > processor implementations can also benefit from their safety. > Implementing this feature should also allow us to drop the adaptor function: > `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl` > Note: this feature requires a KIP, but since the API is already > pre-determined, it should be uncontroversial. This improvement would be a > good opportunity for someone who wants to get an initial KIP under their belt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics
[ https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148976#comment-17148976 ] mohamed chebbi commented on KAFKA-10217: Hi [~vvcephei] i can't find `org.apache.kafka.streams.processor.internals.ProcessorContextUtils` in the kafka source code. buy the way did you mean `org.apache.kafka.streams.processor.internals.ProcessorContextImpl` ? > Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to > StreamsMetrics > --- > > Key: KAFKA-10217 > URL: https://issues.apache.org/jira/browse/KAFKA-10217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: kip-required, newbie > > StreamsMetricsImpl contains several convenience methods for safely > registering sensors at different levels of granularity. We added them as > internal methods because we weren't sure of their stability and utility. Now, > they've been in use for quite a while and seem to be stable. > We should move them up into the public API so that custom stores and > processor implementations can also benefit from their safety. > Implementing this feature should also allow us to drop the adaptor function: > `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl` > Note: this feature requires a KIP, but since the API is already > pre-determined, it should be uncontroversial. This improvement would be a > good opportunity for someone who wants to get an initial KIP under their belt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest
[ https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148475#comment-17148475 ] mohamed chebbi commented on KAFKA-10191: [~chia7712] sorry to be late, i read the reset tool documentation and if i understand it's about adding KafkaStreams#cleanUp() in the code, the other thing to do are done manually out of the code. > fix flaky StreamsOptimizedTest > -- > > Key: KAFKA-10191 > URL: https://issues.apache.org/jira/browse/KAFKA-10191 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote}Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task > 2_0Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549) > at >
[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting the application up the second time
[ https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17145913#comment-17145913 ] mohamed chebbi commented on KAFKA-10191: i want to create a PR for KAFKA-10194 , but i dont understant what do you mean by reset tool. > fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting > the application up the second time > - > > Key: KAFKA-10191 > URL: https://issues.apache.org/jira/browse/KAFKA-10191 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote}Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task > 2_0Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622) > at >