This is an automated email from the ASF dual-hosted git repository.
wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 0a2ee589d5 Test: Add integration test for Shardingsphere-proxy (#9969)
0a2ee589d5 is described below
commit 0a2ee589d58717e0e3f6e49313c64f359a1842e8
Author: Wan Kai <[email protected]>
AuthorDate: Thu Nov 17 20:42:09 2022 +0800
Test: Add integration test for Shardingsphere-proxy (#9969)
* Fix `ShardingTopologyQueryDAO.loadServiceRelationsDetectedAtServerSide`
miss serviceIds.
* Add sharding integration test.
---
.github/workflows/skywalking.yaml | 33 +
docs/en/changes/changes.md | 1 +
.../storage-shardingsphere-plugin/pom.xml | 6 +
.../dao/ShardingTopologyQueryDAO.java | 2 +-
.../{mysql => }/DurationWithinTTLTest.java | 3 +-
.../jdbc/shardingsphere/ServiceCpmMetrics.java | 102 +++
.../shardingsphere/ServiceCpmMetricsBuilder.java | 46 ++
.../shardingsphere/ShardingIntegrationTest.java | 732 +++++++++++++++++++++
.../{mysql => }/ShardingRulesTest.java | 4 +-
.../test/resources/conf-mysql/config-sharding.yaml | 45 ++
.../src/test/resources/conf-mysql/server.yaml | 83 +++
.../src/test/resources/docker-compose-mysql.yml | 60 ++
.../src/test/resources/download-mysql.sh | 32 +
.../src/test/resources/log4j2.xml | 31 +
pom.xml | 1 +
15 files changed, 1175 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/skywalking.yaml
b/.github/workflows/skywalking.yaml
index 786640d32c..621883f209 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -271,6 +271,36 @@ jobs:
./mvnw clean verify -q -B -Dcheckstyle.skip -DskipUTs
-DskipITs=false || \
./mvnw clean verify -q -B -Dcheckstyle.skip -DskipUTs -DskipITs=false
+ it-sharding-proxy:
+ if: |
+ always() &&
+ ((github.event_name == 'schedule' && github.repository ==
'apache/skywalking') || needs.changes.outputs.oap == 'true')
+ name: IT Sharding-proxy
+ needs: [sanity-check, changes]
+ runs-on: ubuntu-latest
+ timeout-minutes: 60
+ strategy:
+ matrix:
+ java-version: [8, 11]
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ submodules: true
+ - name: Cache maven repository
+ uses: actions/cache@v3
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-maven-
+ - uses: actions/setup-java@v3
+ with:
+ java-version: ${{ matrix.java-version }}
+ distribution: adopt
+ - name: IT Sharding-proxy
+ run: |
+ ./mvnw clean test
-Dtest=org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.ShardingIntegrationTest
-DfailIfNoTests=false || \
+ ./mvnw clean test
-Dtest=org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.ShardingIntegrationTest
-DfailIfNoTests=false
+
e2e-test:
if: |
always() &&
@@ -715,6 +745,7 @@ jobs:
- dependency-license
- unit-test
- integration-test
+ - it-sharding-proxy
- e2e-test
- e2e-test-istio
- e2e-test-java-versions
@@ -733,6 +764,7 @@ jobs:
depLicenseResults=${{ needs.dependency-license.result }}
unitResults=${{ needs.unit-test.result }};
integrationResults=${{ needs.integration-test.result }};
+ itShardingResults=${{ needs.it-sharding-proxy.result }};
e2eResults=${{ needs.e2e-test.result }};
e2eIstioResults=${{ needs.e2e-test-istio.result }};
e2eJavaVersionResults=${{ needs.e2e-test-java-versions.result }};
@@ -743,5 +775,6 @@ jobs:
[[ ${e2eResults} == 'success' ]] || [[ ${execute} != 'true' &&
${e2eResults} == 'skipped' ]] || exit -5;
[[ ${e2eIstioResults} == 'success' ]] || [[ ${execute} != 'true' &&
${e2eIstioResults} == 'skipped' ]] || exit -6;
[[ ${e2eJavaVersionResults} == 'success' ]] || [[ ${execute} !=
'true' && ${e2eJavaVersionResults} == 'skipped' ]] || exit -7;
+ [[ ${itShardingResults} == 'success' ]] || [[ ${execute} != 'true'
&& ${itShardingResults} == 'skipped' ]] || exit -8;
exit 0;
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 4a9457e0ba..b52cfada6a 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -112,6 +112,7 @@
* Add `LongText` to support longer logs persistent as a text type in
ElasticSearch, instead of a keyword, to avoid length limitation.
* Fix wrong system variable name
`SW_CORE_ENABLE_ENDPOINT_NAME_GROUPING_BY_OPENAPI`. It was **opaenapi**.
* Fix not-time-series model blocking OAP boots in no-init mode.
+* Fix `ShardingTopologyQueryDAO.loadServiceRelationsDetectedAtServerSide`
invoke backend miss parameter `serviceIds`.
* Changed system variable `SW_SUPERDATASET_STORAGE_DAY_STEP` to
`SW_STORAGE_ES_SUPER_DATASET_DAY_STEP` to be consistent with other ES storage
related variables.
* Fix ESEventQueryDAO missing metric_table boolQuery criteria.
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/pom.xml
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/pom.xml
index a1b614020d..1c204849cc 100644
--- a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/pom.xml
@@ -45,5 +45,11 @@
<artifactId>storage-jdbc-hikaricp-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>8.0.13</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/dao/ShardingTopologyQueryDAO.java
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/dao/ShardingTopologyQueryDAO.java
index cf81ca06ce..40eb704e21 100644
---
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/dao/ShardingTopologyQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/dao/ShardingTopologyQueryDAO.java
@@ -35,7 +35,7 @@ public class ShardingTopologyQueryDAO extends
JDBCTopologyQueryDAO {
@Override
public List<Call.CallDetail>
loadServiceRelationsDetectedAtServerSide(Duration duration,
List<String> serviceIds) throws IOException {
- return
super.loadServiceRelationsDetectedAtServerSide(DurationWithinTTL.INSTANCE.getMetricDurationWithinTTL(duration));
+ return
super.loadServiceRelationsDetectedAtServerSide(DurationWithinTTL.INSTANCE.getMetricDurationWithinTTL(duration),
serviceIds);
}
@Override
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/mysql/DurationWithinTTLTest.java
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/DurationWithinTTLTest.java
similarity index 98%
rename from
oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/mysql/DurationWithinTTLTest.java
rename to
oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/DurationWithinTTLTest.java
index 92598b62c5..f45f871db7 100644
---
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/mysql/DurationWithinTTLTest.java
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/DurationWithinTTLTest.java
@@ -16,11 +16,10 @@
*
*/
-package
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.mysql;
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
-import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.DurationWithinTTL;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ServiceCpmMetrics.java
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ServiceCpmMetrics.java
new file mode 100644
index 0000000000..2982b5186b
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ServiceCpmMetrics.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.CPMMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@Stream(
+ name = ServiceCpmMetrics.INDEX_NAME,
+ scopeId = DefaultScopeDefine.SERVICE,
+ builder = ServiceCpmMetricsBuilder.class,
+ processor = MetricsStreamProcessor.class
+)
+@EqualsAndHashCode(of = {
+ "entityId"
+}, callSuper = true)
+public class ServiceCpmMetrics extends CPMMetrics {
+ public static final String INDEX_NAME = "service_cpm";
+
+ @Setter
+ @Getter
+ @Column(
+ columnName = "entity_id",
+ length = 512
+ )
+ private String entityId;
+
+ @Override
+ protected String id0() {
+ return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ byte var1 = 17;
+ int var2 = 31 * var1 + this.entityId.hashCode();
+ return var2;
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ RemoteData.Builder var1 = RemoteData.newBuilder();
+ var1.addDataStrings(this.getEntityId());
+ var1.addDataLongs(this.getValue());
+ var1.addDataLongs(this.getTotal());
+ var1.addDataLongs(this.getTimeBucket());
+ return var1;
+ }
+
+ @Override
+ public void deserialize(RemoteData var1) {
+ this.setEntityId(var1.getDataStrings(0));
+ this.setValue(var1.getDataLongs(0));
+ this.setTotal(var1.getDataLongs(1));
+ this.setTimeBucket(var1.getDataLongs(2));
+ }
+
+ @Override
+ public Metrics toHour() {
+ ServiceCpmMetrics var1 = new ServiceCpmMetrics();
+ var1.setEntityId(this.getEntityId());
+ var1.setValue(this.getValue());
+ var1.setTotal(this.getTotal());
+ var1.setTimeBucket(this.toTimeBucketInHour());
+ return var1;
+ }
+
+ @Override
+ public Metrics toDay() {
+ ServiceCpmMetrics var1 = new ServiceCpmMetrics();
+ var1.setEntityId(this.getEntityId());
+ var1.setValue(this.getValue());
+ var1.setTotal(this.getTotal());
+ var1.setTimeBucket(this.toTimeBucketInDay());
+ return var1;
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ServiceCpmMetricsBuilder.java
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ServiceCpmMetricsBuilder.java
new file mode 100644
index 0000000000..9269eb7822
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ServiceCpmMetricsBuilder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere;
+
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+public class ServiceCpmMetricsBuilder implements StorageBuilder {
+ public ServiceCpmMetricsBuilder() {
+ }
+
+ public void entity2Storage(StorageData var1, Convert2Storage var2) {
+ ServiceCpmMetrics var3 = (ServiceCpmMetrics) var1;
+ var2.accept("entity_id", var3.getEntityId());
+ var2.accept("value", new Long(var3.getValue()));
+ var2.accept("total", new Long(var3.getTotal()));
+ var2.accept("time_bucket", new Long(var3.getTimeBucket()));
+ }
+
+ public StorageData storage2Entity(Convert2Entity var1) {
+ ServiceCpmMetrics var2 = new ServiceCpmMetrics();
+ var2.setEntityId((String) var1.get("entity_id"));
+ var2.setValue(((Number) var1.get("value")).longValue());
+ var2.setTotal(((Number) var1.get("total")).longValue());
+ var2.setTimeBucket(((Number) var1.get("time_bucket")).longValue());
+ return var2;
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
new file mode 100644
index 0000000000..c7ac504256
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import
org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
+import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import
org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
+import
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
+import org.apache.skywalking.oap.server.core.query.enumeration.Step;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.Entity;
+import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.Call;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.MetricsValues;
+import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
+import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
+import org.apache.skywalking.oap.server.core.query.type.TraceState;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
+import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
+import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
+import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCMetadataQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCMetricsDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCRecordDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCTagAutoCompleteQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLTableInstaller;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.dao.ShardingAggregationQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.dao.ShardingHistoryDeleteDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.dao.ShardingMetricsQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.dao.ShardingTopologyQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.dao.ShardingTraceQueryDAO;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.powermock.reflect.Whitebox;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Slf4j
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(Parameterized.class)
+@PowerMockIgnore({
+ "javax.net.*",
+ "javax.management.*",
+ "com.sun.org.apache.xerces.*",
+ "javax.xml.*", "org.xml.*",
+ "javax.management.*",
+ "org.w3c.*"
+})
+@PrepareForTest({DefaultScopeDefine.class})
+public class ShardingIntegrationTest {
+ @BeforeClass
+ public static void setup() {
+ PowerMockito.mockStatic(DefaultScopeDefine.class);
+ PowerMockito.when(DefaultScopeDefine.nameOf(1)).thenReturn("any");
+ }
+
+ @Parameterized.Parameter
+ public String version;
+
+ @Parameterized.Parameter(1)
+ public DataSourceType dsType;
+
+ @Parameterized.Parameters(name = "version: {0}")
+ public static Collection<Object[]> versions() {
+ return Arrays.asList(new Object[][] {
+ {
+ "5.1.2",
+ DataSourceType.MYSQL
+ }
+ });
+ }
+
+ public DockerComposeContainer<?> environment;
+ private JDBCHikariCPClient ssClient;
+ private JDBCHikariCPClient dsClient0;
+ private JDBCHikariCPClient dsClient1;
+ private final Set<String> dataSources = new
HashSet<>(Arrays.asList("ds_0", "ds_1"));
+ private final int ttlTestCreate = 3;
+ private final int ttlTestDrop = 2;
+ private final String searchableTag = "http.method";
+ private final long timeBucketSec =
Long.parseLong(DateTime.now().toString("yyyyMMddHHmmss"));
+ private final long timeBucketMin =
Long.parseLong(DateTime.now().toString("yyyyMMddHHmm"));
+ private final long timeBucketDay =
Long.parseLong(DateTime.now().toString("yyyyMMdd"));
+ private Duration duration;
+ private Entity entityA;
+ private Entity entityB;
+ private String serviceIdA;
+ private String serviceIdB;
+
+ private ModuleManager moduleManager;
+ private MySQLTableInstaller mySQLTableInstaller;
+ private ShardingSphereTableInstaller installer;
+ private DurationWithinTTL durationWithinTTL = DurationWithinTTL.INSTANCE;
+ private final String countQuery = "SELECT COUNT(*) AS rc FROM ";
+
+ @Before
+ public void init() {
+ if (dsType.equals(DataSourceType.MYSQL)) {
+ startEnv("docker-compose-mysql.yml", 3306);
+ initConnection("mysql", "/swtest?rewriteBatchedStatements=true",
3306, "root", "root@1234");
+ }
+ initTestData();
+ }
+
+ private void startEnv(String dockerComposeName, int dsServicePort) {
+ environment = new DockerComposeContainer<>(new
File(ShardingIntegrationTest.class
+
.getClassLoader()
+
.getResource(dockerComposeName).getPath()))
+ .withExposedService("sharding-proxy", 3307,
+
Wait.defaultWaitStrategy().withStartupTimeout(java.time.Duration.ofMinutes(20))
+ )
+ .withExposedService("data-source-0", dsServicePort,
+
Wait.defaultWaitStrategy().withStartupTimeout(java.time.Duration.ofMinutes(20))
+ )
+ .withExposedService("data-source-1", dsServicePort,
+
Wait.defaultWaitStrategy().withStartupTimeout(java.time.Duration.ofMinutes(20))
+ )
+ .withEnv("SS_VERSION", version);
+ environment.start();
+ }
+
+ private void initConnection(String driverType,
+ String urlSuffix,
+ int dsServicePort,
+ String dsUserName,
+ String dsPassword) {
+ String ssUrl = "jdbc:" + driverType + "://" +
+ environment.getServiceHost("sharding-proxy", 3307) + ":" +
+ environment.getServicePort("sharding-proxy", 3307) +
+ urlSuffix;
+ Properties properties = new Properties();
+ properties.setProperty("jdbcUrl", ssUrl);
+ properties.setProperty("dataSource.user", "root");
+ properties.setProperty("dataSource.password", "root");
+
+ String dsUrl0 = "jdbc:" + driverType + "://" +
+ environment.getServiceHost("data-source-0", dsServicePort) + ":" +
+ environment.getServicePort("data-source-0", dsServicePort) +
+ urlSuffix;
+ Properties propertiesDs0 = new Properties();
+ propertiesDs0.setProperty("jdbcUrl", dsUrl0);
+ propertiesDs0.setProperty("dataSource.user", dsUserName);
+ propertiesDs0.setProperty("dataSource.password", "root@1234");
+
+ String dsUrl1 = "jdbc:" + driverType + "://" +
+ environment.getServiceHost("data-source-1", dsServicePort) + ":" +
+ environment.getServicePort("data-source-1", dsServicePort) +
+ urlSuffix;
+ Properties propertiesDs1 = new Properties();
+ propertiesDs1.setProperty("jdbcUrl", dsUrl1);
+ propertiesDs1.setProperty("dataSource.user", dsUserName);
+ propertiesDs1.setProperty("dataSource.password", dsPassword);
+
+ ssClient = new JDBCHikariCPClient(properties);
+ dsClient0 = new JDBCHikariCPClient(propertiesDs0);
+ dsClient1 = new JDBCHikariCPClient(propertiesDs1);
+
+ ssClient.connect();
+ dsClient0.connect();
+ dsClient1.connect();
+ }
+
+ private void initTestData() {
+ moduleManager = mock(ModuleManager.class);
+ ConfigService configService = mock(ConfigService.class);
+ Whitebox.setInternalState(moduleManager, "isInPrepareStage", false);
+ when(configService.getMetricsDataTTL()).thenReturn(ttlTestCreate);
+ when(configService.getRecordDataTTL()).thenReturn(ttlTestCreate);
+
when(configService.getSearchableTracesTags()).thenReturn(searchableTag);
+
when(moduleManager.find(anyString())).thenReturn(mock(ModuleProviderHolder.class));
+
when(moduleManager.find(CoreModule.NAME).provider()).thenReturn(mock(ModuleServiceHolder.class));
+
when(moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class))
+ .thenReturn(configService);
+ mySQLTableInstaller = new MySQLTableInstaller(ssClient, moduleManager);
+ installer = new ShardingSphereTableInstaller(ssClient, moduleManager,
+ dataSources,
+ mySQLTableInstaller
+ );
+
+ durationWithinTTL.setConfigService(configService);
+
+ duration = new Duration();
+ duration.setStart(DateTime.now().minusMinutes(15).toString("yyyy-MM-dd
HHmm"));
+ duration.setEnd(DateTime.now().plusMinutes(1).toString("yyyy-MM-dd
HHmm"));
+ duration.setStep(Step.MINUTE);
+
+ entityA = new Entity();
+ entityA.setServiceName("Service_A");
+ entityA.setNormal(true);
+ entityA.setScope(Scope.Service);
+
+ entityB = new Entity();
+ entityB.setServiceName("Service_B");
+ entityB.setNormal(true);
+ entityB.setScope(Scope.Service);
+
+ serviceIdA = entityA.buildId();
+ serviceIdB = entityB.buildId();
+ }
+
+ @SneakyThrows
+ @After
+ public void after() {
+ environment.stop();
+ }
+
+ @SneakyThrows
+ @Test
+ public void test() {
+ trafficTest();
+ metricsTest();
+ tagsTest();
+ recordsTest();
+ topologyTest();
+ }
+
+ // @SQLDatabase.Sharding(
+ // shardingAlgorithm = ShardingAlgorithm.TIME_MIN_RANGE_SHARDING_ALGORITHM,
+ // tableShardingColumn = TIME_BUCKET,
+ // dataSourceShardingColumn = ID)
+ @SneakyThrows
+ private void trafficTest() {
+ log.info("Traffic test start...");
+ StorageModels models = new StorageModels();
+ models.add(
+ EndpointTraffic.class, DefaultScopeDefine.SERVICE,
+ new Storage(EndpointTraffic.INDEX_NAME, false,
DownSampling.Minute), false
+ );
+ Model model = models.allModels().get(0);
+ TableMetaInfo.addModel(model);
+
+ EndpointTraffic endpointTrafficA = new EndpointTraffic();
+ endpointTrafficA.setName("Endpoint_A");
+ endpointTrafficA.setServiceId(serviceIdA);
+ endpointTrafficA.setTimeBucket(timeBucketMin);
+ EndpointTraffic endpointTrafficB = new EndpointTraffic();
+ endpointTrafficB.setName("Endpoint_B");
+ endpointTrafficB.setServiceId(serviceIdB);
+ endpointTrafficB.setTimeBucket(timeBucketMin);
+ List<Metrics> metricsList = new ArrayList<>();
+ metricsList.add(endpointTrafficA);
+ metricsList.add(endpointTrafficB);
+
+ createShardingRuleTest(model);
+ updateShardingRuleTest(model);
+ createShardingTableTest(model);
+
+ insertMetrics(model, metricsList);
+ testDataSharding(endpointTrafficA);
+ ttlDropTest(model);
+
+ //Test traffic query
+ JDBCMetadataQueryDAO metadataQueryDAO = new
JDBCMetadataQueryDAO(ssClient, 100);
+ List<Endpoint> endpoints = metadataQueryDAO.findEndpoint("",
endpointTrafficA.getServiceId(), 100);
+ Assert.assertEquals(endpointTrafficA.getName(),
endpoints.get(0).getName());
+ log.info("Traffic test passed.");
+ }
+
+ // @SQLDatabase.Sharding(
+ // shardingAlgorithm =
ShardingAlgorithm.TIME_RELATIVE_ID_SHARDING_ALGORITHM,
+ // tableShardingColumn = ID,
+ // dataSourceShardingColumn = ENTITY_ID)
+ @SneakyThrows
+ private void metricsTest() {
+ log.info("Metrics test start...");
+ StorageModels models = new StorageModels();
+ models.add(
+ ServiceCpmMetrics.class, DefaultScopeDefine.SERVICE,
+ new Storage(ServiceCpmMetrics.INDEX_NAME, true,
DownSampling.Minute), false
+ );
+ Model model = models.allModels().get(0);
+ TableMetaInfo.addModel(model);
+
+ ServiceCpmMetrics serviceCpmMetricsA = new ServiceCpmMetrics();
+ serviceCpmMetricsA.setEntityId(serviceIdA);
+ serviceCpmMetricsA.setValue(100);
+ serviceCpmMetricsA.setTotal(100);
+ serviceCpmMetricsA.setTimeBucket(timeBucketMin);
+ ServiceCpmMetrics serviceCpmMetricsB = new ServiceCpmMetrics();
+ serviceCpmMetricsB.setEntityId(serviceIdB);
+ serviceCpmMetricsB.setValue(200);
+ serviceCpmMetricsB.setTotal(200);
+ serviceCpmMetricsB.setTimeBucket(timeBucketMin);
+
+ createShardingRuleTest(model);
+ updateShardingRuleTest(model);
+ createShardingTableTest(model);
+
+ insertMetrics(model, Arrays.asList(serviceCpmMetricsA,
serviceCpmMetricsB));
+ testDataSharding(serviceCpmMetricsA);
+
+ ttlDropTest(model);
+
+ //Test topN
+ ShardingAggregationQueryDAO aggregationQueryDAO = new
ShardingAggregationQueryDAO(ssClient);
+ TopNCondition topNCondition = new TopNCondition();
+ topNCondition.setName(ServiceCpmMetrics.INDEX_NAME);
+ topNCondition.setTopN(1);
+ topNCondition.setOrder(Order.DES);
+
+ SelectedRecord top1Record =
aggregationQueryDAO.sortMetrics(topNCondition, "value", duration, null).get(0);
+ Assert.assertEquals(serviceCpmMetricsB.getEntityId(),
top1Record.getId());
+ Assert.assertEquals("200.0000", top1Record.getValue());
+
+ //Test metrics query
+ ShardingMetricsQueryDAO metricsQueryDAO = new
ShardingMetricsQueryDAO(ssClient);
+ MetricsCondition metricsCondition = new MetricsCondition();
+
+ metricsCondition.setName(ServiceCpmMetrics.INDEX_NAME);
+ metricsCondition.setEntity(entityA);
+
+ long value = metricsQueryDAO.readMetricsValue(metricsCondition,
"value", duration);
+ Assert.assertEquals(serviceCpmMetricsA.getValue(), value);
+
+ MetricsValues values =
metricsQueryDAO.readMetricsValues(metricsCondition, "value", duration);
+ String metricsId = serviceCpmMetricsA.getTimeBucket() + "_" +
serviceCpmMetricsA.getEntityId();
+ Assert.assertEquals(serviceCpmMetricsA.getValue(),
values.getValues().findValue(metricsId, 0));
+ log.info("Metrics test passed.");
+ }
+
+ // @SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.NO_SHARDING)
+ @SneakyThrows
+ private void tagsTest() {
+ log.info("Tag auto complete data test start...");
+ StorageModels models = new StorageModels();
+ models.add(
+ TagAutocompleteData.class, DefaultScopeDefine.SEGMENT,
+ new Storage(TagAutocompleteData.INDEX_NAME, true,
DownSampling.Minute), false
+ );
+ Model model = models.allModels().get(0);
+ TableMetaInfo.addModel(model);
+ installer.createTable(model);
+ TagAutocompleteData tagData1 = new TagAutocompleteData();
+ tagData1.setTagType(TagType.TRACE.name());
+ tagData1.setTagKey(searchableTag);
+ tagData1.setTagValue("GET");
+ tagData1.setTimeBucket(timeBucketMin);
+ TagAutocompleteData tagData2 = new TagAutocompleteData();
+ tagData2.setTagType(TagType.TRACE.name());
+ tagData2.setTagKey(searchableTag);
+ tagData2.setTagValue("POST");
+ //Should be deleted after TTL process
+ tagData2.setTimeBucket(timeBucketMin);
+ TagAutocompleteData tagData3 = new TagAutocompleteData();
+ tagData3.setTagType(TagType.TRACE.name());
+ tagData3.setTagKey(searchableTag);
+ tagData3.setTagValue("HEAD");
+ //Should be deleted after TTL process
+
tagData3.setTimeBucket(Long.parseLong(DateTime.now().minusDays(5).toString("yyyyMMddHHmm")));
+ insertMetrics(model, Arrays.asList(tagData1, tagData2, tagData3));
+
+ //Test total count
+ try (Connection ssConn = ssClient.getConnection()) {
+ ResultSet rs = ssClient.executeQuery(ssConn, countQuery +
TagAutocompleteData.INDEX_NAME);
+ rs.next();
+ Assert.assertEquals(3, rs.getInt("rc"));
+ }
+
+ // Test query
+ JDBCTagAutoCompleteQueryDAO tagQueryDAO = new
JDBCTagAutoCompleteQueryDAO(ssClient);
+ Set<String> tagKeys =
tagQueryDAO.queryTagAutocompleteKeys(TagType.TRACE, 10, duration);
+ Assert.assertEquals(searchableTag, tagKeys.iterator().next());
+ Set<String> tagValues =
tagQueryDAO.queryTagAutocompleteValues(TagType.TRACE, searchableTag, 10,
duration);
+ Assert.assertEquals(2, tagValues.size());
+
+ // Test TTL
+ historyDelete(model);
+ try (Connection ssConn = ssClient.getConnection()) {
+ ResultSet rs = ssClient.executeQuery(ssConn, countQuery +
TagAutocompleteData.INDEX_NAME);
+ rs.next();
+ Assert.assertEquals(2, rs.getInt("rc"));
+ }
+ log.info("Tag auto complete data test passed.");
+ }
+
+ // @SQLDatabase.Sharding(
+ // shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM,
+ // dataSourceShardingColumn = SERVICE_ID,
+ // tableShardingColumn = TIME_BUCKET)
+ @SneakyThrows
+ private void recordsTest() {
+ log.info("Records (Trace) test start...");
+ StorageModels models = new StorageModels();
+ models.add(
+ SegmentRecord.class, DefaultScopeDefine.SEGMENT,
+ new Storage(SegmentRecord.INDEX_NAME, false, DownSampling.Second),
true
+ );
+ Model model = models.allModels().get(0);
+ TableMetaInfo.addModel(model);
+ Tag tag = new Tag(searchableTag, "GET");
+ List<String> tags = Collections.singletonList(tag.toString());
+ SegmentRecord segmentRecordA = new SegmentRecord();
+ segmentRecordA.setSegmentId("segmentA");
+ segmentRecordA.setTraceId("traceA");
+ segmentRecordA.setServiceId(serviceIdA);
+ segmentRecordA.setTimeBucket(timeBucketSec);
+ segmentRecordA.setTags(tags);
+ segmentRecordA.setStartTime(DateTime.now().getMillis());
+
segmentRecordA.setEndpointId(IDManager.EndpointID.buildId(segmentRecordA.getServiceId(),
"Endpoint_A"));
+ SegmentRecord segmentRecordB = new SegmentRecord();
+ segmentRecordB.setSegmentId("segmentB");
+ segmentRecordB.setTraceId("traceA");
+ segmentRecordB.setServiceId(serviceIdB);
+ segmentRecordB.setTimeBucket(timeBucketSec);
+ segmentRecordB.setTags(tags);
+ segmentRecordB.setStartTime(DateTime.now().getMillis());
+
segmentRecordA.setEndpointId(IDManager.EndpointID.buildId(segmentRecordB.getServiceId(),
"Endpoint_B"));
+
+ createShardingRuleTest(model);
+ updateShardingRuleTest(model);
+ createShardingTableTest(model);
+ insertRecords(model, Arrays.asList(segmentRecordA, segmentRecordB));
+ testDataSharding(segmentRecordA);
+ ttlDropTest(model);
+
+ //Test trace query
+ ShardingTraceQueryDAO traceQueryDAO = new
ShardingTraceQueryDAO(moduleManager, ssClient);
+ TraceBrief traceBrief = traceQueryDAO.queryBasicTraces(
+ duration, 0, 0,
+ segmentRecordA.getServiceId(), null,
+ null, null, 10, 0,
+ TraceState.SUCCESS, QueryOrder.BY_START_TIME,
Collections.singletonList(tag)
+ );
+ Assert.assertEquals(segmentRecordA.getSegmentId(),
traceBrief.getTraces().get(0).getSegmentId());
+
+ List<SegmentRecord> segmentRecords =
traceQueryDAO.queryByTraceId(segmentRecordA.getTraceId());
+ Assert.assertEquals(2, segmentRecords.size());
+ log.info("Records (Trace) test passed.");
+ }
+
+ // @SQLDatabase.Sharding(
+ // shardingAlgorithm = ShardingAlgorithm.TIME_BUCKET_SHARDING_ALGORITHM,
+ // tableShardingColumn = TIME_BUCKET,
+ // dataSourceShardingColumn = ENTITY_ID)
+ @SneakyThrows
+ private void topologyTest() {
+ log.info("Topology test start...");
+ StorageModels models = new StorageModels();
+ models.add(
+ ServiceRelationServerSideMetrics.class,
DefaultScopeDefine.SERVICE_RELATION,
+ new Storage(ServiceRelationServerSideMetrics.INDEX_NAME, true,
DownSampling.Minute), false
+ );
+ models.add(
+ ServiceRelationClientSideMetrics.class,
DefaultScopeDefine.SERVICE_RELATION,
+ new Storage(ServiceRelationClientSideMetrics.INDEX_NAME, true,
DownSampling.Minute), false
+ );
+ Model serverModel = models.allModels().get(0);
+ Model clientModel = models.allModels().get(1);
+ TableMetaInfo.addModel(serverModel);
+ TableMetaInfo.addModel(clientModel);
+
+ ServiceRelationServerSideMetrics serverSideMetricsA = new
ServiceRelationServerSideMetrics();
+ String clientIdA = IDManager.ServiceID.buildId("HTTP_Client", false);
+ serverSideMetricsA.setSourceServiceId(clientIdA);
+ serverSideMetricsA.setDestServiceId(serviceIdA);
+ serverSideMetricsA.setEntityId(IDManager.ServiceID.buildRelationId(
+ new IDManager.ServiceID.ServiceRelationDefine(clientIdA,
serviceIdA)));
+ serverSideMetricsA.setComponentId(0);
+ serverSideMetricsA.setTimeBucket(timeBucketMin);
+ ServiceRelationClientSideMetrics clientSideMetricsA = new
ServiceRelationClientSideMetrics();
+ clientSideMetricsA.setSourceServiceId(clientIdA);
+ clientSideMetricsA.setDestServiceId(serviceIdA);
+ clientSideMetricsA.setEntityId(IDManager.ServiceID.buildRelationId(
+ new IDManager.ServiceID.ServiceRelationDefine(clientIdA,
serviceIdA)));
+ clientSideMetricsA.setComponentId(0);
+ clientSideMetricsA.setTimeBucket(timeBucketMin);
+
+ ServiceRelationServerSideMetrics serverSideMetricsB = new
ServiceRelationServerSideMetrics();
+ serverSideMetricsB.setSourceServiceId(serviceIdA);
+ serverSideMetricsB.setDestServiceId(serviceIdB);
+ serverSideMetricsB.setEntityId(IDManager.ServiceID.buildRelationId(
+ new IDManager.ServiceID.ServiceRelationDefine(serviceIdA,
serviceIdB)));
+ serverSideMetricsB.setComponentId(0);
+ serverSideMetricsB.setTimeBucket(timeBucketMin);
+ ServiceRelationClientSideMetrics clientSideMetricsB = new
ServiceRelationClientSideMetrics();
+ clientSideMetricsB.setSourceServiceId(clientIdA);
+ clientSideMetricsB.setDestServiceId(serviceIdA);
+ clientSideMetricsB.setEntityId(IDManager.ServiceID.buildRelationId(
+ new IDManager.ServiceID.ServiceRelationDefine(serviceIdA,
serviceIdB)));
+ clientSideMetricsB.setComponentId(0);
+ clientSideMetricsB.setTimeBucket(timeBucketMin);
+
+ createShardingRuleTest(serverModel);
+ createShardingRuleTest(clientModel);
+ updateShardingRuleTest(serverModel);
+ updateShardingRuleTest(clientModel);
+ createShardingTableTest(serverModel);
+ createShardingTableTest(clientModel);
+
+ insertMetrics(serverModel, Arrays.asList(serverSideMetricsA,
serverSideMetricsB));
+ insertMetrics(clientModel, Arrays.asList(clientSideMetricsA,
clientSideMetricsB));
+ testDataSharding(serverSideMetricsA);
+ testDataSharding(clientSideMetricsA);
+ ttlDropTest(serverModel);
+ ttlDropTest(clientModel);
+
+ //Test topology query
+ ShardingTopologyQueryDAO queryDAO = new
ShardingTopologyQueryDAO(ssClient);
+ List<Call.CallDetail> callDetailsServerSide =
queryDAO.loadServiceRelationsDetectedAtServerSide(
+ duration, Arrays.asList(serviceIdB));
+ //Service_A -----> Service_B
+ Assert.assertEquals(serviceIdB,
callDetailsServerSide.get(0).getTarget());
+ List<Call.CallDetail> callDetailsClientSide =
queryDAO.loadServiceRelationDetectedAtClientSide(
+ duration, Arrays.asList(serviceIdA));
+ //HTTP_Client -----> Service_A -----> Service_B
+ Assert.assertEquals(2, callDetailsClientSide.size());
+ log.info("Topology test passed.");
+ }
+
+ @SneakyThrows
+ private void createShardingRuleTest(Model model) {
+ ShardingRulesOperator.INSTANCE.createOrUpdateShardingRule(ssClient,
model, dataSources, ttlTestCreate);
+ Map<String, ShardingRule> shardingRules = Whitebox.getInternalState(
+ ShardingRulesOperator.INSTANCE, "modelShardingRules");
+ ShardingRule inputRule =
shardingRules.get(model.getName()).toBuilder().build();
+ ShardingRule outPutRule = loadShardingRule(model);
+ outPutRule.setOperation("CREATE");
+ //The rules in the database erased all `"`
+ Assert.assertEquals(inputRule.toShardingRuleSQL().replaceAll("\"",
""), outPutRule.toShardingRuleSQL());
+ }
+
+ private void updateShardingRuleTest(Model model) throws StorageException {
+ ShardingRulesOperator.INSTANCE.createOrUpdateShardingRule(ssClient,
model, dataSources, ttlTestCreate);
+ Map<String, ShardingRule> shardingRules = Whitebox.getInternalState(
+ ShardingRulesOperator.INSTANCE, "modelShardingRules");
+ ShardingRule inputRule =
shardingRules.get(model.getName()).toBuilder().build();
+ ShardingRule outPutRule = loadShardingRule(model);
+ outPutRule.setOperation("ALTER");
+ //The rules in the database erased all `"`
+ Assert.assertEquals(inputRule.toShardingRuleSQL().replaceAll("\"",
""), outPutRule.toShardingRuleSQL());
+ }
+
+ /**
+ * Load sharding rule from the database, return a clone rule for comparing.
+ */
+ @SneakyThrows
+ private ShardingRule loadShardingRule(Model model) {
+ Whitebox.invokeMethod(ShardingRulesOperator.INSTANCE,
"initShardingRules", ssClient);
+ Map<String, ShardingRule> shardingRules = Whitebox.getInternalState(
+ ShardingRulesOperator.INSTANCE, "modelShardingRules");
+ return shardingRules.get(model.getName()).toBuilder().build();
+ }
+
+ @SneakyThrows
+ private void createShardingTableTest(Model model) {
+ installer.createTable(model);
+ existsTest(dsClient0, model);
+ existsTest(dsClient1, model);
+ }
+
+ @SneakyThrows
+ private void existsTest(JDBCHikariCPClient dsClient, Model model) {
+ //TTL is 3 so just test 4 tables
+ List<String> tables = new ArrayList<>();
+ tables.add(model.getName() + "_" +
DateTime.now().minusDays(2).toString("yyyyMMdd"));
+ tables.add(model.getName() + "_" +
DateTime.now().minusDays(1).toString("yyyyMMdd"));
+ tables.add(model.getName() + "_" +
DateTime.now().toString("yyyyMMdd"));
+ tables.add(model.getName() + "_" +
DateTime.now().plusDays(1).toString("yyyyMMdd"));
+ try (Connection conn = dsClient.getConnection()) {
+ for (String name : tables) {
+ ResultSet rset =
conn.getMetaData().getTables(conn.getCatalog(), null, name, null);
+ Assert.assertTrue(rset.next());
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void insertMetrics(Model model, List<Metrics> metricsList) {
+ try (Connection conn = ssClient.getConnection()) {
+ for (Metrics metrics : metricsList) {
+ JDBCMetricsDAO jdbcMetricsDAO = new JDBCMetricsDAO(ssClient,
metrics.getClass()
+
.getAnnotation(Stream.class)
+
.builder()
+
.getDeclaredConstructor()
+
.newInstance());
+ jdbcMetricsDAO.prepareBatchInsert(model, metrics).invoke(conn);
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void insertRecords(Model model, List<Record> recordList) {
+ try (Connection conn = ssClient.getConnection()) {
+ for (Record record : recordList) {
+ JDBCRecordDAO jdbcRecordDAO = new JDBCRecordDAO(ssClient,
record.getClass()
+
.getAnnotation(Stream.class)
+
.builder()
+
.getDeclaredConstructor()
+
.newInstance());
+ ((SQLExecutor) jdbcRecordDAO.prepareBatchInsert(model,
record)).invoke(conn);
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void testDataSharding(StorageData data) {
+ try (Connection ssConn = ssClient.getConnection();
+ Connection ds0Conn = dsClient0.getConnection();
+ Connection ds1Conn = dsClient1.getConnection()) {
+
+ String logicIndex =
data.getClass().getAnnotation(Stream.class).name();
+ String physicalIndex = logicIndex + "_" + timeBucketDay;
+
+ ResultSet logicSet = ssClient.executeQuery(ssConn, countQuery +
logicIndex);
+ logicSet.next();
+ Assert.assertEquals(2, logicSet.getInt("rc"));
+
+ ResultSet physicalSet0 = dsClient0.executeQuery(ds0Conn,
countQuery + physicalIndex);
+ physicalSet0.next();
+ Assert.assertEquals(1, physicalSet0.getInt("rc"));
+
+ ResultSet physicalSet1 = dsClient1.executeQuery(ds1Conn,
countQuery + physicalIndex);
+ physicalSet1.next();
+ Assert.assertEquals(1, physicalSet1.getInt("rc"));
+
+ if
(data.getClass().isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class))
{
+ String additionalLogicIndex =
data.getClass().getAnnotation(Stream.class).name();
+ String additionalPhysicalIndex = logicIndex + "_" +
timeBucketDay;
+
+ ResultSet additionalLogicSet = ssClient.executeQuery(
+ ssConn, countQuery + additionalLogicIndex);
+ additionalLogicSet.next();
+ Assert.assertEquals(2, additionalLogicSet.getInt("rc"));
+
+ ResultSet additionalPhysicalSet0 = dsClient0.executeQuery(
+ ds0Conn, countQuery + additionalPhysicalIndex);
+ additionalPhysicalSet0.next();
+ Assert.assertEquals(1, additionalPhysicalSet0.getInt("rc"));
+
+ ResultSet additionalPhysicalSet1 = dsClient1.executeQuery(
+ ds1Conn, countQuery + additionalPhysicalIndex);
+ additionalPhysicalSet1.next();
+ Assert.assertEquals(1, additionalPhysicalSet1.getInt("rc"));
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void ttlDropTest(Model model) {
+ historyDelete(model);
+
+ String droppedTable = model.getName() + "_" +
DateTime.now().minusDays(2).toString("yyyyMMdd");
+ ResultSet rset0 = dsClient0.getConnection()
+ .getMetaData()
+
.getTables(dsClient0.getConnection().getCatalog(), null, droppedTable, null);
+ Assert.assertFalse(rset0.next());
+ ResultSet rset1 = dsClient1.getConnection()
+ .getMetaData()
+
.getTables(dsClient1.getConnection().getCatalog(), null, droppedTable, null);
+ Assert.assertFalse(rset1.next());
+ }
+
+ @SneakyThrows
+ private void historyDelete(Model model) {
+ ShardingHistoryDeleteDAO deleteDAO = new ShardingHistoryDeleteDAO(
+ ssClient, dataSources, moduleManager, mySQLTableInstaller);
+ deleteDAO.deleteHistory(model, Metrics.TIME_BUCKET, ttlTestDrop);
+ }
+
+ private enum DataSourceType {
+ MYSQL
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/mysql/ShardingRulesTest.java
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingRulesTest.java
similarity index 96%
rename from
oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/mysql/ShardingRulesTest.java
rename to
oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingRulesTest.java
index 6bbafef031..999f6f6d9a 100644
---
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/mysql/ShardingRulesTest.java
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingRulesTest.java
@@ -16,13 +16,11 @@
*
*/
-package
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.mysql;
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
-import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.ShardingRule;
-import
org.apache.skywalking.oap.server.storage.plugin.jdbc.shardingsphere.ShardingRulesOperator;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.junit.Assert;
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/conf-mysql/config-sharding.yaml
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/conf-mysql/config-sharding.yaml
new file mode 100644
index 0000000000..c7b336bd52
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/conf-mysql/config-sharding.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################################################################################################
+#
+# Here you can configure the rules for the proxy.
+# This example is configuration of sharding rule.
+#
+######################################################################################################
+#
+databaseName: swtest
+
+dataSources:
+ ds_0:
+ url: jdbc:mysql://data-source-0:3306/swtest?allowPublicKeyRetrieval=true
+ username: root
+ password: root@1234
+ connectionTimeoutMilliseconds: 30000
+ idleTimeoutMilliseconds: 60000
+ maxLifetimeMilliseconds: 1800000
+ maxPoolSize: 50
+ minPoolSize: 1
+ ds_1:
+ url: jdbc:mysql://data-source-1:3306/swtest?allowPublicKeyRetrieval=true
+ username: root
+ password: root@1234
+ connectionTimeoutMilliseconds: 30000
+ idleTimeoutMilliseconds: 60000
+ maxLifetimeMilliseconds: 1800000
+ maxPoolSize: 50
+ minPoolSize: 1
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/conf-mysql/server.yaml
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/conf-mysql/server.yaml
new file mode 100644
index 0000000000..dc66c7cfe4
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/conf-mysql/server.yaml
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################################################################################################
+#
+# If you want to configure governance, authorization and proxy properties,
please refer to this file.
+#
+######################################################################################################
+
+#mode:
+# type: Cluster
+# repository:
+# type: ZooKeeper
+# props:
+# namespace: governance_ds
+# server-lists: localhost:2181
+# retryIntervalMilliseconds: 500
+# timeToLiveSeconds: 60
+# maxRetries: 3
+# operationTimeoutMilliseconds: 500
+# overwrite: false
+#
+rules:
+ - !AUTHORITY
+ users:
+ - root@:root
+ provider:
+ type: ALL_PERMITTED
+ - !TRANSACTION
+ defaultType: XA
+ providerType: Atomikos
+ # When the provider type is Narayana, the following properties can be
configured or not
+ props:
+ recoveryStoreUrl: jdbc:mysql://127.0.0.1:3306/jbossts
+ recoveryStoreDataSource: com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+ recoveryStoreUser: root
+ recoveryStorePassword: 12345678
+ - !SQL_PARSER
+ sqlCommentParseEnabled: true
+ sqlStatementCache:
+ initialCapacity: 2000
+ maximumSize: 65535
+ parseTreeCache:
+ initialCapacity: 128
+ maximumSize: 1024
+
+props:
+ max-connections-size-per-query: 1
+ kernel-executor-size: 16 # Infinite by default.
+ proxy-frontend-flush-threshold: 128 # The default value is 128.
+ proxy-hint-enabled: true
+ sql-show: false
+ check-table-metadata-enabled: false
+ show-process-list-enabled: false
+ # Proxy backend query fetch size. A larger value may increase the memory
usage of ShardingSphere Proxy.
+ # The default value is -1, which means set the minimum value for different
JDBC drivers.
+ proxy-backend-query-fetch-size: -1
+ check-duplicate-table-enabled: false
+ proxy-frontend-executor-size: 0 # Proxy frontend executor size. The default
value is 0, which means let Netty decide.
+ # Available options of proxy backend executor suitable: OLAP(default),
OLTP. The OLTP option may reduce time cost of writing packets to client, but it
may increase the latency of SQL execution
+ # and block other clients if client connections are more than
`proxy-frontend-executor-size`, especially executing slow SQL.
+ proxy-backend-executor-suitable: OLAP
+ proxy-frontend-max-connections: 0 # Less than or equal to 0 means no
limitation.
+ sql-federation-enabled: false
+ # Available proxy backend driver type: JDBC (default), ExperimentalVertx
+ proxy-backend-driver-type: JDBC
+ proxy-mysql-default-version: 5.7.22 # In the absence of schema name, the
default version will be used.
+ proxy-default-port: 3307 # Proxy default port.
+ proxy-netty-backlog: 1024 # Proxy netty backlog.
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/docker-compose-mysql.yml
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/docker-compose-mysql.yml
new file mode 100644
index 0000000000..e3227a2523
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/docker-compose-mysql.yml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ sharding-proxy:
+ image: apache/shardingsphere-proxy:${SS_VERSION}
+ volumes:
+ - ./download-mysql.sh:/opt/shardingsphere-proxy/download-mysql.sh
+ - ./conf-mysql:/opt/shardingsphere-proxy/conf
+ networks:
+ - e2e
+ expose:
+ - 3307
+ entrypoint: ['sh', '-c', '/opt/shardingsphere-proxy/download-mysql.sh
/opt/shardingsphere-proxy/ext-lib && /opt/shardingsphere-proxy/bin/start.sh
3307 && tail -f /opt/shardingsphere-proxy/logs/stdout.log']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/3307"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ data-source-0:
+ condition: service_healthy
+ data-source-1:
+ condition: service_healthy
+
+ data-source-0: &ds
+ image: mysql/mysql-server:8.0.13
+ networks:
+ - e2e
+ expose:
+ - 3306
+ environment:
+ MYSQL_ROOT_PASSWORD: "root@1234"
+ MYSQL_DATABASE: "swtest"
+ MYSQL_ROOT_HOST: "%"
+ healthcheck:
+ test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/3306"
]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ data-source-1:
+ <<: *ds
+
+networks:
+ e2e:
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/download-mysql.sh
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/download-mysql.sh
new file mode 100755
index 0000000000..7ffd5e8760
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/download-mysql.sh
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -ex
+
+LIB_HOME=$1
+MYSQL_URL="https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.13/mysql-connector-java-8.0.13.jar"
+MYSQL_DRIVER="mysql-connector-java-8.0.13.jar"
+
+# ensure the curl command been installed
+if ! command -v curl &> /dev/null; then
+ apt update -y && apt install -y curl
+fi
+
+if ! curl -Lo "${LIB_HOME}/${MYSQL_DRIVER}" ${MYSQL_URL}; then
+ echo "Fail to download ${MYSQL_DRIVER}."
+ exit 1
+fi
diff --git
a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/log4j2.xml
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..cd672826bb
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<Configuration status="INFO">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x
- %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="INFO">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/pom.xml b/pom.xml
index aaf2228e86..ab76862160 100755
--- a/pom.xml
+++ b/pom.xml
@@ -339,6 +339,7 @@
<skip>${skipUTs}</skip>
<excludes>
<exclude>IT*.class</exclude>
+ <exclude>ShardingIntegrationTest.class</exclude>
</excludes>
</configuration>
</plugin>