kael-aiur opened a new issue, #12503: URL: https://github.com/apache/skywalking/issues/12503
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/skywalking/issues?q=is%3Aissue) and found no similar issues. ### Apache SkyWalking Component OAP server (apache/skywalking) ### What happened I have a custom meter yaml: ```yaml filter: "{ tags -> tags['service'] != null && tags['instance'] != null && tags['resource'] != null }" expSuffix: instance(['service'],['instance'], Layer.GENERAL) metricPrefix: sentinel metricsRules: - name: sw8_receive_request_total_sum exp: sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M') - name: sw8_pass_requests_total_sum exp: sw8_pass_requests_total.sum(['service','instance','resource']).increase('PT1M') ``` when I have many instance to report this meter, sometime get exception stack like blow: ``` 2024-08-02 13:58:24,274 org.apache.skywalking.oap.meter.analyzer.dsl.Expression 89 [KafkaConsumer-1-thread-14] ERROR [] - [9.5.0-d47165d] failed to run "(sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')).instance(['service'],['instance'], Layer.GENERAL)" java.lang.NullPointerException: Cannot read field "_1" because "t2" is null at io.vavr.Tuple2.compareTo(Tuple2.java:90) ~[vavr-0.10.3.jar:?] at io.vavr.Tuple2.compareTo(Tuple2.java:111) ~[vavr-0.10.3.jar:?] at io.vavr.Tuple2.compareTo(Tuple2.java:43) ~[vavr-0.10.3.jar:?] at java.util.PriorityQueue.siftDownComparable(PriorityQueue.java:694) ~[?:?] at java.util.PriorityQueue.poll(PriorityQueue.java:583) ~[?:?] at org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow.increase(CounterWindow.java:62) ~[classes/:?] at org.apache.skywalking.oap.meter.analyzer.dsl.Sample.increase(Sample.java:50) ~[classes/:?] at org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily.lambda$increase$16(SampleFamily.java:279) ~[classes/:?] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?] at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) ~[?:?] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575) ~[?:?] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:?] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616) ~[?:?] at org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily.increase(SampleFamily.java:280) ~[classes/:?] at org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily$increase$0.call(Unknown Source) ~[?:?] at Script1.run(Script1.groovy:1) ~[?:?] at org.apache.skywalking.oap.meter.analyzer.dsl.Expression.run(Expression.java:78) ~[classes/:?] at org.apache.skywalking.oap.meter.analyzer.Analyzer.analyse(Analyzer.java:133) ~[classes/:?] at org.apache.skywalking.oap.meter.analyzer.MetricConvert.toMeter(MetricConvert.java:109) ~[classes/:?] at org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor.lambda$process$6(MeterProcessor.java:135) ~[classes/:?] at java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?] at org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor.process(MeterProcessor.java:135) ~[classes/:?] at org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.MeterServiceHandler.handle(MeterServiceHandler.java:85) ~[classes/:?] at org.apache.skywalking.oap.server.analyzer.agent.kafka.KafkaFetcherHandlerRegister.lambda$runTask$1(KafkaFetcherHandlerRegister.java:122) ~[classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:840) [?:?] ``` as `org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow` are single instance, the windows which type ConcurrentHashMap will return the same PriorityQueue object for the same meter, when the same meter increase with concurrent call, some unexcpected exception will occur in PriorityQueue object, source code: ```java public class CounterWindow { public static final CounterWindow INSTANCE = new CounterWindow(); private final Map<ID, Tuple2<Long, Double>> lastElementMap = new ConcurrentHashMap<>(); private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new ConcurrentHashMap<>(); public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) { ID id = new ID(name, labels); // there will return the same PriorityQueue object for concurrent operation Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>()); window.offer(Tuple.of(now, value)); // some unexpected error will occur in this line when concurrent operation long waterLevel = now - windowSize; Tuple2<Long, Double> peek = window.peek(); if (peek._1 > waterLevel) { return peek; } Tuple2<Long, Double> result = peek; while (peek._1 < waterLevel) { result = window.poll(); // some unexpected error will occur in this line when concurrent operation peek = window.element(); // some unexpected error will occur in this line when concurrent operation } // Choose the closed slot to the expected timestamp if (waterLevel - result._1 <= peek._1 - waterLevel) { return result; } return peek; } //.... } ``` ### What you expected to happen for the same PriorityQueue, here a synchronization lock should be added to the window to ensure synchronization operations: ```java public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) { ID id = new ID(name, labels); Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>()); synchronized (window) { // use synchronized key word to lock window window.offer(Tuple.of(now, value)); long waterLevel = now - windowSize; Tuple2<Long, Double> peek = window.peek(); if (peek._1 > waterLevel) { return peek; } Tuple2<Long, Double> result = peek; while (peek._1 < waterLevel) { result = window.poll(); peek = window.element(); } // Choose the closed slot to the expected timestamp if (waterLevel - result._1 <= peek._1 - waterLevel) { return result; } return peek; } } ``` ### How to reproduce Through a custom indicator configuration, it must contain an increase call, such as: `sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')`, then use debug breakpoints to wait for a certain amount of custom indicators to be reported. At this time, release the breakpoints and run them all to reappear with a high probability. ### Anything else  In this way, you can see that some queues have been completely destroyed, size < 0, and this queue will permanently fail until the service is restarted. ### Are you willing to submit a pull request to fix on your own? - [X] Yes I am willing to submit a pull request on my own! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
