This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new b5cd3fad7e Support `sumPerMinLabeled` in `MAL` (#9804)
b5cd3fad7e is described below

commit b5cd3fad7e21782a254b83d1fa67cdc14c391baf
Author: mrproliu <[email protected]>
AuthorDate: Wed Oct 19 13:50:58 2022 +0800

    Support `sumPerMinLabeled` in `MAL` (#9804)
---
 docs/en/changes/changes.md                         |   1 +
 .../skywalking/oap/meter/analyzer/Analyzer.java    |   5 +-
 .../function/sumpermin/SumPerMinFunction.java      |   2 +-
 ...Function.java => SumPerMinLabeledFunction.java} | 112 ++++++++--------
 .../sumpermin/SumPerMinLabeledFunctionTest.java    | 142 +++++++++++++++++++++
 5 files changed, 197 insertions(+), 65 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 7c02662bcc..3cdd10eab8 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -62,6 +62,7 @@
   is acceptable in our case.
 * Optimize the query time of tasks in ProfileTaskCache.
 * Fix metrics was put into wrong slot of the window in the alerting kernel.
+* Support `sumPerMinLabeled` in `MAL`.
 
 #### UI
 
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index cf406a375e..90a0a9e83d 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -34,6 +34,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.CaseUtils;
 import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
 import org.apache.skywalking.oap.meter.analyzer.dsl.DownsamplingType;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
@@ -247,8 +248,8 @@ public class Analyzer {
     private void createMetric(final ScopeType scopeType,
                               final String dataType,
                               final DownsamplingType downsamplingType) {
-        String functionName = String.format(
-            "%s%s", downsamplingType.toString().toLowerCase(), 
StringUtils.capitalize(dataType));
+        String downSamplingStr = 
CaseUtils.toCamelCase(downsamplingType.toString().toLowerCase(), false, '_');
+        String functionName = String.format("%s%s", downSamplingStr, 
StringUtils.capitalize(dataType));
         meterSystem.create(metricName, functionName, scopeType);
     }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
index 2d7aeb5c32..4774e06ce0 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
@@ -43,7 +43,7 @@ import 
org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import java.util.Objects;
 
 @ToString
-@MeterFunction(functionName = "sum_per_min")
+@MeterFunction(functionName = "sumPerMin")
 public abstract class SumPerMinFunction extends Meter implements 
AcceptableValue<Long>, LongValueHolder {
     protected static final String VALUE = "value";
     protected static final String TOTAL = "total";
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunction.java
similarity index 68%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunction.java
index 2d7aeb5c32..d246370f9b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunction.java
@@ -20,7 +20,6 @@ package 
org.apache.skywalking.oap.server.core.analysis.meter.function.sumpermin;
 
 import lombok.Getter;
 import lombok.Setter;
-import lombok.ToString;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
@@ -28,11 +27,11 @@ import 
org.apache.skywalking.oap.server.core.analysis.meter.Meter;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import 
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
 import 
org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
-import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import 
org.apache.skywalking.oap.server.core.analysis.metrics.LabeledValueHolder;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import 
org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
 import 
org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
-import org.apache.skywalking.oap.server.core.query.sql.Function;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@@ -42,11 +41,11 @@ import 
org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 
 import java.util.Objects;
 
-@ToString
-@MeterFunction(functionName = "sum_per_min")
-public abstract class SumPerMinFunction extends Meter implements 
AcceptableValue<Long>, LongValueHolder {
-    protected static final String VALUE = "value";
-    protected static final String TOTAL = "total";
+@MeterFunction(functionName = "sumPerMinLabeled")
+public abstract class SumPerMinLabeledFunction extends Meter implements 
AcceptableValue<DataTable>, LabeledValueHolder {
+
+    protected static final String VALUE = "datatable_value";
+    protected static final String TOTAL = "datatable_total";
 
     @Setter
     @Getter
@@ -61,34 +60,52 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
 
     @Getter
     @Setter
-    @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, 
function = Function.Avg)
-    private long value;
+    @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, 
storageOnly = true)
+    private DataTable value = new DataTable(30);
 
     @Getter
     @Setter
     @Column(columnName = TOTAL, storageOnly = true)
-    private long total;
+    private DataTable total = new DataTable(30);
 
     @Entrance
-    public final void combine(@SourceFrom long value) {
-        this.total += value;
+    public final void combine(@SourceFrom DataTable value) {
+        this.total.append(value);
+    }
+
+    @Override
+    public void accept(MeterEntity entity, DataTable value) {
+        setEntityId(entity.id());
+        setServiceId(entity.serviceId());
+        this.total.append(value);
+    }
+
+    @Override
+    public Class<? extends StorageBuilder> builder() {
+        return SumPerMinLabeledStorageBuilder.class;
     }
 
     @Override
     public boolean combine(Metrics metrics) {
-        final SumPerMinFunction sumPerMinFunction = (SumPerMinFunction) 
metrics;
-        combine(sumPerMinFunction.getTotal());
+        final SumPerMinLabeledFunction sumPerMinLabeledFunction = 
(SumPerMinLabeledFunction) metrics;
+        combine(sumPerMinLabeledFunction.getTotal());
         return true;
     }
 
     @Override
     public void calculate() {
-        setValue(this.total / getDurationInMinute());
+        for (String key : total.keys()) {
+            final Long val = total.get(key);
+            if (Objects.isNull(val)) {
+                continue;
+            }
+            value.put(key, val / getDurationInMinute());
+        }
     }
 
     @Override
     public Metrics toHour() {
-        final SumPerMinFunction metrics = (SumPerMinFunction) createNew();
+        final SumPerMinLabeledFunction metrics = (SumPerMinLabeledFunction) 
createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInHour());
         metrics.setServiceId(getServiceId());
@@ -98,7 +115,7 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
 
     @Override
     public Metrics toDay() {
-        final SumPerMinFunction metrics = (SumPerMinFunction) createNew();
+        final SumPerMinLabeledFunction metrics = (SumPerMinLabeledFunction) 
createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInDay());
         metrics.setServiceId(getServiceId());
@@ -107,14 +124,14 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
     }
 
     @Override
-    public int remoteHashCode() {
-        return getEntityId().hashCode();
+    protected String id0() {
+        return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
     }
 
     @Override
     public void deserialize(RemoteData remoteData) {
-        setTotal(remoteData.getDataLongs(0));
-        setTimeBucket(remoteData.getDataLongs(1));
+        setTotal(new DataTable(remoteData.getDataObjectStrings(0)));
+        setTimeBucket(remoteData.getDataLongs(0));
 
         setEntityId(remoteData.getDataStrings(0));
         setServiceId(remoteData.getDataStrings(1));
@@ -124,7 +141,7 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
     public RemoteData.Builder serialize() {
         final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
 
-        remoteBuilder.addDataLongs(getTotal());
+        remoteBuilder.addDataObjectStrings(total.toStorageData());
         remoteBuilder.addDataLongs(getTimeBucket());
 
         remoteBuilder.addDataStrings(getEntityId());
@@ -133,33 +150,22 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
     }
 
     @Override
-    protected String id0() {
-        return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
-    }
-
-    @Override
-    public void accept(MeterEntity entity, Long value) {
-        setEntityId(entity.id());
-        setServiceId(entity.serviceId());
-        setTotal(getTotal() + value);
+    public int remoteHashCode() {
+        return getEntityId().hashCode();
     }
 
-    @Override
-    public Class<? extends StorageBuilder> builder() {
-        return SumPerMinStorageBuilder.class;
-    }
+    public static class SumPerMinLabeledStorageBuilder implements 
StorageBuilder<SumPerMinLabeledFunction> {
 
-    public static class SumPerMinStorageBuilder implements 
StorageBuilder<SumPerMinFunction> {
         @Override
-        public SumPerMinFunction storage2Entity(final Convert2Entity 
converter) {
-            final SumPerMinFunction metrics = new SumPerMinFunction() {
+        public SumPerMinLabeledFunction storage2Entity(Convert2Entity 
converter) {
+            final SumPerMinLabeledFunction metrics = new 
SumPerMinLabeledFunction() {
                 @Override
-                public AcceptableValue<Long> createNew() {
+                public AcceptableValue<DataTable> createNew() {
                     throw new UnexpectedException("createNew should not be 
called");
                 }
             };
-            metrics.setValue(((Number) converter.get(VALUE)).longValue());
-            metrics.setTotal(((Number) converter.get(TOTAL)).longValue());
+            metrics.setValue(new DataTable((String) converter.get(VALUE)));
+            metrics.setTotal(new DataTable((String) converter.get(TOTAL)));
             metrics.setTimeBucket(((Number) 
converter.get(TIME_BUCKET)).longValue());
             metrics.setServiceId((String) 
converter.get(InstanceTraffic.SERVICE_ID));
             metrics.setEntityId((String) converter.get(ENTITY_ID));
@@ -167,7 +173,7 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
         }
 
         @Override
-        public void entity2Storage(final SumPerMinFunction storageData, final 
Convert2Storage converter) {
+        public void entity2Storage(SumPerMinLabeledFunction storageData, 
Convert2Storage converter) {
             converter.accept(VALUE, storageData.getValue());
             converter.accept(TOTAL, storageData.getTotal());
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
@@ -175,22 +181,4 @@ public abstract class SumPerMinFunction extends Meter 
implements AcceptableValue
             converter.accept(ENTITY_ID, storageData.getEntityId());
         }
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof SumPerMinFunction)) {
-            return false;
-        }
-        final SumPerMinFunction function = (SumPerMinFunction) o;
-        return Objects.equals(getEntityId(), function.getEntityId())
-            && Objects.equals(getTimeBucket(), function.getTimeBucket());
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(getEntityId(), getTimeBucket());
-    }
-}
+}
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunctionTest.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunctionTest.java
new file mode 100644
index 0000000000..45c8b7920d
--- /dev/null
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunctionTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.skywalking.oap.server.core.analysis.meter.function.sumpermin;
+
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import 
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Map;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SumPerMinLabeledFunctionTest {
+
+    private SumPerMinLabeledFunctionInst function;
+    private DataTable table1;
+    private DataTable table2;
+
+    @BeforeClass
+    public static void setup() {
+        MeterEntity.setNamingControl(
+            new NamingControl(512, 512, 512, new EndpointNameGrouping()));
+    }
+
+    @Before
+    public void before() {
+        function = new SumPerMinLabeledFunctionInst();
+        
function.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+
+        table1 = new DataTable();
+        table1.put("200", 100L);
+        table1.put("300", 50L);
+
+        table2 = new DataTable();
+        table2.put("200", 120L);
+        table2.put("300", 17L);
+        table2.put("400", 77L);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        MeterEntity.setNamingControl(null);
+    }
+
+    @Test
+    public void testAccept() {
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table1);
+        function.calculate();
+        assertThat(function.getValue(), is(table1));
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table2);
+        function.calculate();
+        assertThat(function.getValue(), is(table1.append(table2)));
+    }
+
+    @Test
+    public void testCalculate() {
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table1);
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table2);
+        function.calculate();
+        assertThat(function.getValue(), is(table1.append(table2)));
+    }
+
+    @Test
+    public void testHour() {
+        
function.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table1);
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table2);
+        function.calculate();
+        final SumPerMinLabeledFunction hourFunction = 
(SumPerMinLabeledFunction) function.toHour();
+        hourFunction.calculate();
+        final DataTable result = new DataTable();
+        result.append(table1);
+        result.append(table2);
+        for (String key : result.keys()) {
+            result.put(key, result.get(key) / 60);
+        }
+        assertThat(hourFunction.getValue(), is(result));
+    }
+
+    @Test
+    public void testSerialize() {
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table1);
+        SumPerMinLabeledFunction function2 = 
Mockito.spy(SumPerMinLabeledFunction.class);
+        function2.deserialize(function.serialize().build());
+        assertThat(function2.getEntityId(), is(function.getEntityId()));
+        assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, 
InstantiationException {
+        function.accept(MeterEntity.newService("sum_sync_time", 
Layer.GENERAL), table1);
+        function.calculate();
+        StorageBuilder<SumPerMinLabeledFunction> storageBuilder = 
function.builder().newInstance();
+
+        final HashMapConverter.ToStorage toStorage = new 
HashMapConverter.ToStorage();
+        storageBuilder.entity2Storage(function, toStorage);
+        final Map<String, Object> map = toStorage.obtain();
+        map.put(SumPerMinLabeledFunction.VALUE, ((DataTable) 
map.get(SumPerMinLabeledFunction.VALUE)).toStorageData());
+        map.put(SumPerMinLabeledFunction.TOTAL, ((DataTable) 
map.get(SumPerMinLabeledFunction.TOTAL)).toStorageData());
+
+        SumPerMinLabeledFunction function2 = storageBuilder.storage2Entity(new 
HashMapConverter.ToEntity(map));
+        assertThat(function2.getValue(), is(function.getValue()));
+    }
+
+    private static class SumPerMinLabeledFunctionInst extends 
SumPerMinLabeledFunction {
+        @Override
+        public AcceptableValue<DataTable> createNew() {
+            return new SumPerMinLabeledFunctionInst();
+        }
+    }
+}
\ No newline at end of file

Reply via email to