kael-aiur opened a new issue, #12503:
URL: https://github.com/apache/skywalking/issues/12503

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/skywalking/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Apache SkyWalking Component
   
   OAP server (apache/skywalking)
   
   ### What happened
   
   I have a custom meter yaml:
   ```yaml
   filter: "{ tags -> tags['service'] != null && tags['instance'] != null && 
tags['resource'] != null  }"
   expSuffix: instance(['service'],['instance'], Layer.GENERAL)
   metricPrefix: sentinel
   metricsRules:
     - name: sw8_receive_request_total_sum
       exp:  
sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')
     - name: sw8_pass_requests_total_sum
       exp: 
sw8_pass_requests_total.sum(['service','instance','resource']).increase('PT1M')
   ```
   
   when I have many instance to report this meter, sometime get exception stack 
like blow: 
   ```
   2024-08-02 13:58:24,274 
org.apache.skywalking.oap.meter.analyzer.dsl.Expression 89 
[KafkaConsumer-1-thread-14] ERROR [] - [9.5.0-d47165d] failed to run 
"(sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')).instance(['service'],['instance'],
 Layer.GENERAL)"
   java.lang.NullPointerException: Cannot read field "_1" because "t2" is null
        at io.vavr.Tuple2.compareTo(Tuple2.java:90) ~[vavr-0.10.3.jar:?]
        at io.vavr.Tuple2.compareTo(Tuple2.java:111) ~[vavr-0.10.3.jar:?]
        at io.vavr.Tuple2.compareTo(Tuple2.java:43) ~[vavr-0.10.3.jar:?]
        at java.util.PriorityQueue.siftDownComparable(PriorityQueue.java:694) 
~[?:?]
        at java.util.PriorityQueue.poll(PriorityQueue.java:583) ~[?:?]
        at 
org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow.increase(CounterWindow.java:62)
 ~[classes/:?]
        at 
org.apache.skywalking.oap.meter.analyzer.dsl.Sample.increase(Sample.java:50) 
~[classes/:?]
        at 
org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily.lambda$increase$16(SampleFamily.java:279)
 ~[classes/:?]
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
        at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575) ~[?:?]
        at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
 ~[?:?]
        at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616) ~[?:?]
        at 
org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily.increase(SampleFamily.java:280)
 ~[classes/:?]
        at 
org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily$increase$0.call(Unknown
 Source) ~[?:?]
        at Script1.run(Script1.groovy:1) ~[?:?]
        at 
org.apache.skywalking.oap.meter.analyzer.dsl.Expression.run(Expression.java:78) 
~[classes/:?]
        at 
org.apache.skywalking.oap.meter.analyzer.Analyzer.analyse(Analyzer.java:133) 
~[classes/:?]
        at 
org.apache.skywalking.oap.meter.analyzer.MetricConvert.toMeter(MetricConvert.java:109)
 ~[classes/:?]
        at 
org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor.lambda$process$6(MeterProcessor.java:135)
 ~[classes/:?]
        at java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?]
        at 
org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor.process(MeterProcessor.java:135)
 ~[classes/:?]
        at 
org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.MeterServiceHandler.handle(MeterServiceHandler.java:85)
 ~[classes/:?]
        at 
org.apache.skywalking.oap.server.analyzer.agent.kafka.KafkaFetcherHandlerRegister.lambda$runTask$1(KafkaFetcherHandlerRegister.java:122)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) 
[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[?:?]
        at java.lang.Thread.run(Thread.java:840) [?:?]
   ```
   
   as `org.apache.skywalking.oap.meter.analyzer.dsl.counter.CounterWindow` are 
single instance, the windows which type ConcurrentHashMap will return the same 
PriorityQueue object for the same meter, when the same meter increase with 
concurrent call, some unexcpected exception will occur in PriorityQueue object, 
source code:
   
   ```java
   public class CounterWindow {
   
       public static final CounterWindow INSTANCE = new CounterWindow();
   
       private final Map<ID, Tuple2<Long, Double>> lastElementMap = new 
ConcurrentHashMap<>();
       private final Map<ID, Queue<Tuple2<Long, Double>>> windows = new 
ConcurrentHashMap<>();
   
       public Tuple2<Long, Double> increase(String name, ImmutableMap<String, 
String> labels, Double value, long windowSize, long now) {
           ID id = new ID(name, labels);
           // there will return the same PriorityQueue object for concurrent 
operation
           Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, 
unused -> new PriorityQueue<>());
           window.offer(Tuple.of(now, value));  // some unexpected error will 
occur in this line when concurrent operation
           long waterLevel = now - windowSize;
           Tuple2<Long, Double> peek = window.peek();
           if (peek._1 > waterLevel) {
               return peek;
           }
   
           Tuple2<Long, Double> result = peek;
           while (peek._1 < waterLevel) {
               result = window.poll();  // some unexpected error will occur in 
this line when concurrent operation
               peek = window.element();  // some unexpected error will occur in 
this line when concurrent operation
           }
   
           // Choose the closed slot to the expected timestamp
           if (waterLevel - result._1 <= peek._1 - waterLevel) {
               return result;
           }
   
           return peek;
       }
       //....
   }
   ```
   
   
   ### What you expected to happen
   
   for the same PriorityQueue, here a synchronization lock should be added to 
the window to ensure synchronization operations:
   ```java
       public Tuple2<Long, Double> increase(String name, ImmutableMap<String, 
String> labels, Double value, long windowSize, long now) {
           ID id = new ID(name, labels);
           Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, 
unused -> new PriorityQueue<>());
           synchronized (window) { // use synchronized key word to lock window
               window.offer(Tuple.of(now, value));
               long waterLevel = now - windowSize;
               Tuple2<Long, Double> peek = window.peek();
               if (peek._1 > waterLevel) {
                   return peek;
               }
   
               Tuple2<Long, Double> result = peek;
               while (peek._1 < waterLevel) {
                   result = window.poll();
                   peek = window.element();
               }
   
               // Choose the closed slot to the expected timestamp
               if (waterLevel - result._1 <= peek._1 - waterLevel) {
                   return result;
               }
   
               return peek;
           }
       }
   ```
   
   ### How to reproduce
   
   Through a custom indicator configuration, it must contain an increase call, 
such as: 
`sw8_receive_request_total.sum(['service','instance','resource']).increase('PT1M')`,
   then use debug breakpoints to wait for a certain amount of custom indicators 
to be reported. At this time, release the breakpoints and run them all to 
reappear with a high probability.
   
   ### Anything else
   
   
![image](https://github.com/user-attachments/assets/3f5d747a-5774-4afd-92b3-2c7ada895c56)
   In this way, you can see that some queues have been completely destroyed, 
size < 0, and this queue will permanently fail until the service is restarted.
   
   ### Are you willing to submit a pull request to fix on your own?
   
   - [X] Yes I am willing to submit a pull request on my own!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 
[email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to