This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new b5cd3fad7e Support `sumPerMinLabeled` in `MAL` (#9804)
b5cd3fad7e is described below
commit b5cd3fad7e21782a254b83d1fa67cdc14c391baf
Author: mrproliu <[email protected]>
AuthorDate: Wed Oct 19 13:50:58 2022 +0800
Support `sumPerMinLabeled` in `MAL` (#9804)
---
docs/en/changes/changes.md | 1 +
.../skywalking/oap/meter/analyzer/Analyzer.java | 5 +-
.../function/sumpermin/SumPerMinFunction.java | 2 +-
...Function.java => SumPerMinLabeledFunction.java} | 112 ++++++++--------
.../sumpermin/SumPerMinLabeledFunctionTest.java | 142 +++++++++++++++++++++
5 files changed, 197 insertions(+), 65 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 7c02662bcc..3cdd10eab8 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -62,6 +62,7 @@
is acceptable in our case.
* Optimize the query time of tasks in ProfileTaskCache.
* Fix metrics was put into wrong slot of the window in the alerting kernel.
+* Support `sumPerMinLabeled` in `MAL`.
#### UI
diff --git
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index cf406a375e..90a0a9e83d 100644
---
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -34,6 +34,7 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.CaseUtils;
import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
import org.apache.skywalking.oap.meter.analyzer.dsl.DownsamplingType;
import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
@@ -247,8 +248,8 @@ public class Analyzer {
private void createMetric(final ScopeType scopeType,
final String dataType,
final DownsamplingType downsamplingType) {
- String functionName = String.format(
- "%s%s", downsamplingType.toString().toLowerCase(),
StringUtils.capitalize(dataType));
+ String downSamplingStr =
CaseUtils.toCamelCase(downsamplingType.toString().toLowerCase(), false, '_');
+ String functionName = String.format("%s%s", downSamplingStr,
StringUtils.capitalize(dataType));
meterSystem.create(metricName, functionName, scopeType);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
index 2d7aeb5c32..4774e06ce0 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
@@ -43,7 +43,7 @@ import
org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import java.util.Objects;
@ToString
-@MeterFunction(functionName = "sum_per_min")
+@MeterFunction(functionName = "sumPerMin")
public abstract class SumPerMinFunction extends Meter implements
AcceptableValue<Long>, LongValueHolder {
protected static final String VALUE = "value";
protected static final String TOTAL = "total";
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunction.java
similarity index 68%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunction.java
index 2d7aeb5c32..d246370f9b 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinFunction.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunction.java
@@ -20,7 +20,6 @@ package
org.apache.skywalking.oap.server.core.analysis.meter.function.sumpermin;
import lombok.Getter;
import lombok.Setter;
-import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
@@ -28,11 +27,11 @@ import
org.apache.skywalking.oap.server.core.analysis.meter.Meter;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import
org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
-import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import
org.apache.skywalking.oap.server.core.analysis.metrics.LabeledValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import
org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
import
org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
-import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@@ -42,11 +41,11 @@ import
org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import java.util.Objects;
-@ToString
-@MeterFunction(functionName = "sum_per_min")
-public abstract class SumPerMinFunction extends Meter implements
AcceptableValue<Long>, LongValueHolder {
- protected static final String VALUE = "value";
- protected static final String TOTAL = "total";
+@MeterFunction(functionName = "sumPerMinLabeled")
+public abstract class SumPerMinLabeledFunction extends Meter implements
AcceptableValue<DataTable>, LabeledValueHolder {
+
+ protected static final String VALUE = "datatable_value";
+ protected static final String TOTAL = "datatable_total";
@Setter
@Getter
@@ -61,34 +60,52 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
@Getter
@Setter
- @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE,
function = Function.Avg)
- private long value;
+ @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE,
storageOnly = true)
+ private DataTable value = new DataTable(30);
@Getter
@Setter
@Column(columnName = TOTAL, storageOnly = true)
- private long total;
+ private DataTable total = new DataTable(30);
@Entrance
- public final void combine(@SourceFrom long value) {
- this.total += value;
+ public final void combine(@SourceFrom DataTable value) {
+ this.total.append(value);
+ }
+
+ @Override
+ public void accept(MeterEntity entity, DataTable value) {
+ setEntityId(entity.id());
+ setServiceId(entity.serviceId());
+ this.total.append(value);
+ }
+
+ @Override
+ public Class<? extends StorageBuilder> builder() {
+ return SumPerMinLabeledStorageBuilder.class;
}
@Override
public boolean combine(Metrics metrics) {
- final SumPerMinFunction sumPerMinFunction = (SumPerMinFunction)
metrics;
- combine(sumPerMinFunction.getTotal());
+ final SumPerMinLabeledFunction sumPerMinLabeledFunction =
(SumPerMinLabeledFunction) metrics;
+ combine(sumPerMinLabeledFunction.getTotal());
return true;
}
@Override
public void calculate() {
- setValue(this.total / getDurationInMinute());
+ for (String key : total.keys()) {
+ final Long val = total.get(key);
+ if (Objects.isNull(val)) {
+ continue;
+ }
+ value.put(key, val / getDurationInMinute());
+ }
}
@Override
public Metrics toHour() {
- final SumPerMinFunction metrics = (SumPerMinFunction) createNew();
+ final SumPerMinLabeledFunction metrics = (SumPerMinLabeledFunction)
createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
@@ -98,7 +115,7 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
@Override
public Metrics toDay() {
- final SumPerMinFunction metrics = (SumPerMinFunction) createNew();
+ final SumPerMinLabeledFunction metrics = (SumPerMinLabeledFunction)
createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
@@ -107,14 +124,14 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
}
@Override
- public int remoteHashCode() {
- return getEntityId().hashCode();
+ protected String id0() {
+ return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
}
@Override
public void deserialize(RemoteData remoteData) {
- setTotal(remoteData.getDataLongs(0));
- setTimeBucket(remoteData.getDataLongs(1));
+ setTotal(new DataTable(remoteData.getDataObjectStrings(0)));
+ setTimeBucket(remoteData.getDataLongs(0));
setEntityId(remoteData.getDataStrings(0));
setServiceId(remoteData.getDataStrings(1));
@@ -124,7 +141,7 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
public RemoteData.Builder serialize() {
final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
- remoteBuilder.addDataLongs(getTotal());
+ remoteBuilder.addDataObjectStrings(total.toStorageData());
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(getEntityId());
@@ -133,33 +150,22 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
}
@Override
- protected String id0() {
- return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
- }
-
- @Override
- public void accept(MeterEntity entity, Long value) {
- setEntityId(entity.id());
- setServiceId(entity.serviceId());
- setTotal(getTotal() + value);
+ public int remoteHashCode() {
+ return getEntityId().hashCode();
}
- @Override
- public Class<? extends StorageBuilder> builder() {
- return SumPerMinStorageBuilder.class;
- }
+ public static class SumPerMinLabeledStorageBuilder implements
StorageBuilder<SumPerMinLabeledFunction> {
- public static class SumPerMinStorageBuilder implements
StorageBuilder<SumPerMinFunction> {
@Override
- public SumPerMinFunction storage2Entity(final Convert2Entity
converter) {
- final SumPerMinFunction metrics = new SumPerMinFunction() {
+ public SumPerMinLabeledFunction storage2Entity(Convert2Entity
converter) {
+ final SumPerMinLabeledFunction metrics = new
SumPerMinLabeledFunction() {
@Override
- public AcceptableValue<Long> createNew() {
+ public AcceptableValue<DataTable> createNew() {
throw new UnexpectedException("createNew should not be
called");
}
};
- metrics.setValue(((Number) converter.get(VALUE)).longValue());
- metrics.setTotal(((Number) converter.get(TOTAL)).longValue());
+ metrics.setValue(new DataTable((String) converter.get(VALUE)));
+ metrics.setTotal(new DataTable((String) converter.get(TOTAL)));
metrics.setTimeBucket(((Number)
converter.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String)
converter.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) converter.get(ENTITY_ID));
@@ -167,7 +173,7 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
}
@Override
- public void entity2Storage(final SumPerMinFunction storageData, final
Convert2Storage converter) {
+ public void entity2Storage(SumPerMinLabeledFunction storageData,
Convert2Storage converter) {
converter.accept(VALUE, storageData.getValue());
converter.accept(TOTAL, storageData.getTotal());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
@@ -175,22 +181,4 @@ public abstract class SumPerMinFunction extends Meter
implements AcceptableValue
converter.accept(ENTITY_ID, storageData.getEntityId());
}
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof SumPerMinFunction)) {
- return false;
- }
- final SumPerMinFunction function = (SumPerMinFunction) o;
- return Objects.equals(getEntityId(), function.getEntityId())
- && Objects.equals(getTimeBucket(), function.getTimeBucket());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getEntityId(), getTimeBucket());
- }
-}
+}
\ No newline at end of file
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunctionTest.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunctionTest.java
new file mode 100644
index 0000000000..45c8b7920d
--- /dev/null
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sumpermin/SumPerMinLabeledFunctionTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.skywalking.oap.server.core.analysis.meter.function.sumpermin;
+
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Map;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SumPerMinLabeledFunctionTest {
+
+ private SumPerMinLabeledFunctionInst function;
+ private DataTable table1;
+ private DataTable table2;
+
+ @BeforeClass
+ public static void setup() {
+ MeterEntity.setNamingControl(
+ new NamingControl(512, 512, 512, new EndpointNameGrouping()));
+ }
+
+ @Before
+ public void before() {
+ function = new SumPerMinLabeledFunctionInst();
+
function.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+
+ table1 = new DataTable();
+ table1.put("200", 100L);
+ table1.put("300", 50L);
+
+ table2 = new DataTable();
+ table2.put("200", 120L);
+ table2.put("300", 17L);
+ table2.put("400", 77L);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ MeterEntity.setNamingControl(null);
+ }
+
+ @Test
+ public void testAccept() {
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table1);
+ function.calculate();
+ assertThat(function.getValue(), is(table1));
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table2);
+ function.calculate();
+ assertThat(function.getValue(), is(table1.append(table2)));
+ }
+
+ @Test
+ public void testCalculate() {
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table1);
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table2);
+ function.calculate();
+ assertThat(function.getValue(), is(table1.append(table2)));
+ }
+
+ @Test
+ public void testHour() {
+
function.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table1);
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table2);
+ function.calculate();
+ final SumPerMinLabeledFunction hourFunction =
(SumPerMinLabeledFunction) function.toHour();
+ hourFunction.calculate();
+ final DataTable result = new DataTable();
+ result.append(table1);
+ result.append(table2);
+ for (String key : result.keys()) {
+ result.put(key, result.get(key) / 60);
+ }
+ assertThat(hourFunction.getValue(), is(result));
+ }
+
+ @Test
+ public void testSerialize() {
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table1);
+ SumPerMinLabeledFunction function2 =
Mockito.spy(SumPerMinLabeledFunction.class);
+ function2.deserialize(function.serialize().build());
+ assertThat(function2.getEntityId(), is(function.getEntityId()));
+ assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
+ }
+
+ @Test
+ public void testBuilder() throws IllegalAccessException,
InstantiationException {
+ function.accept(MeterEntity.newService("sum_sync_time",
Layer.GENERAL), table1);
+ function.calculate();
+ StorageBuilder<SumPerMinLabeledFunction> storageBuilder =
function.builder().newInstance();
+
+ final HashMapConverter.ToStorage toStorage = new
HashMapConverter.ToStorage();
+ storageBuilder.entity2Storage(function, toStorage);
+ final Map<String, Object> map = toStorage.obtain();
+ map.put(SumPerMinLabeledFunction.VALUE, ((DataTable)
map.get(SumPerMinLabeledFunction.VALUE)).toStorageData());
+ map.put(SumPerMinLabeledFunction.TOTAL, ((DataTable)
map.get(SumPerMinLabeledFunction.TOTAL)).toStorageData());
+
+ SumPerMinLabeledFunction function2 = storageBuilder.storage2Entity(new
HashMapConverter.ToEntity(map));
+ assertThat(function2.getValue(), is(function.getValue()));
+ }
+
+ private static class SumPerMinLabeledFunctionInst extends
SumPerMinLabeledFunction {
+ @Override
+ public AcceptableValue<DataTable> createNew() {
+ return new SumPerMinLabeledFunctionInst();
+ }
+ }
+}
\ No newline at end of file