[jira] [Assigned] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics

2020-07-04 Thread mohamed chebbi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mohamed chebbi reassigned KAFKA-10217:
--

Assignee: mohamed chebbi

> Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to 
> StreamsMetrics
> ---
>
> Key: KAFKA-10217
> URL: https://issues.apache.org/jira/browse/KAFKA-10217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: mohamed chebbi
>Priority: Minor
>  Labels: kip-required, newbie
>
> StreamsMetricsImpl contains several convenience methods for safely 
> registering sensors at different levels of granularity. We added them as 
> internal methods because we weren't sure of their stability and utility. Now, 
> they've been in use for quite a while and seem to be stable.
> We should move them up into the public API so that custom stores and 
> processor implementations can also benefit from their safety.
> Implementing this feature should also allow us to drop the adaptor function: 
> `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl`
> Note: this feature requires a KIP, but since the API is already 
> pre-determined, it should be uncontroversial. This improvement would be a 
> good opportunity for someone who wants to get an initial KIP under their belt.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics

2020-06-30 Thread mohamed chebbi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148976#comment-17148976
 ] 

mohamed chebbi commented on KAFKA-10217:


Hi [~vvcephei]

i can't find 
`org.apache.kafka.streams.processor.internals.ProcessorContextUtils` in the 
kafka source code.

buy the way did you mean 
`org.apache.kafka.streams.processor.internals.ProcessorContextImpl` ?

> Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to 
> StreamsMetrics
> ---
>
> Key: KAFKA-10217
> URL: https://issues.apache.org/jira/browse/KAFKA-10217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: kip-required, newbie
>
> StreamsMetricsImpl contains several convenience methods for safely 
> registering sensors at different levels of granularity. We added them as 
> internal methods because we weren't sure of their stability and utility. Now, 
> they've been in use for quite a while and seem to be stable.
> We should move them up into the public API so that custom stores and 
> processor implementations can also benefit from their safety.
> Implementing this feature should also allow us to drop the adaptor function: 
> `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl`
> Note: this feature requires a KIP, but since the API is already 
> pre-determined, it should be uncontroversial. This improvement would be a 
> good opportunity for someone who wants to get an initial KIP under their belt.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest

2020-06-30 Thread mohamed chebbi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148475#comment-17148475
 ] 

mohamed chebbi commented on KAFKA-10191:


[~chia7712] sorry to be late, i read the reset tool documentation and if i 
understand it's about adding KafkaStreams#cleanUp() in the code, the other 
thing to do are done manually out of the code.

> fix flaky StreamsOptimizedTest
> --
>
> Key: KAFKA-10191
> URL: https://issues.apache.org/jira/browse/KAFKA-10191
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 
> 2_0Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
>  at 
> 

[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting the application up the second time

2020-06-25 Thread mohamed chebbi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17145913#comment-17145913
 ] 

mohamed chebbi commented on KAFKA-10191:


i want to create a PR for KAFKA-10194 , but i dont understant what do you mean 
by reset tool.

> fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting 
> the application up the second time
> -
>
> Key: KAFKA-10191
> URL: https://issues.apache.org/jira/browse/KAFKA-10191
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 
> 2_0Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622)
>  at 
>