This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c6f3b4348f0 Revert YAML line break (#29638)
c6f3b4348f0 is described below
commit c6f3b4348f02f0ab1fff3e5faea4f72cec9ffcdd
Author: Raigor <[email protected]>
AuthorDate: Wed Jan 3 14:50:44 2024 +0800
Revert YAML line break (#29638)
---
.../shardingsphere/infra/util/yaml/YamlEngine.java | 3 +-
.../pipeline/cdc/core/importer/CDCImporter.java | 12 +--
.../ral/queryable/ExportMetaDataExecutorTest.java | 118 ++++++++++++++++++++-
3 files changed, 121 insertions(+), 12 deletions(-)
diff --git
a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/yaml/YamlEngine.java
b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/yaml/YamlEngine.java
index 44f75ebadce..0c78f1359bf 100644
---
a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/yaml/YamlEngine.java
+++
b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/yaml/YamlEngine.java
@@ -22,7 +22,6 @@ import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.infra.util.yaml.constructor.ShardingSphereYamlConstructor;
import
org.apache.shardingsphere.infra.util.yaml.representer.ShardingSphereYamlRepresenter;
import org.yaml.snakeyaml.DumperOptions;
-import org.yaml.snakeyaml.DumperOptions.LineBreak;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.representer.Representer;
@@ -106,7 +105,7 @@ public final class YamlEngine {
*/
public static String marshal(final Object value) {
DumperOptions dumperOptions = new DumperOptions();
- dumperOptions.setLineBreak(LineBreak.UNIX);
+
dumperOptions.setLineBreak(DumperOptions.LineBreak.getPlatformLineBreak());
if (value instanceof Collection) {
return new Yaml(new ShardingSphereYamlRepresenter(dumperOptions),
dumperOptions).dumpAs(value, null, DumperOptions.FlowStyle.BLOCK);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index bc65fee2ff0..be3f1312b3a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -92,24 +92,24 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
private void doWithoutSorting() {
- for (final CDCChannelProgressPair channelProgressPair :
originalChannelProgressPairs) {
- PipelineChannel channel = channelProgressPair.getChannel();
- List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+ PipelineChannel channel = each.getChannel();
+ List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(record -> !(record instanceof
PlaceholderRecord)).collect(Collectors.toList());
if (records.isEmpty()) {
continue;
}
Record lastRecord = records.get(records.size() - 1);
if (lastRecord instanceof FinishedRecord &&
records.stream().noneMatch(DataRecord.class::isInstance)) {
channel.ack(records);
-
channelProgressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
- originalChannelProgressPairs.remove(channelProgressPair);
+ each.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ originalChannelProgressPairs.remove(each);
continue;
}
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT,
1);
}
String ackId = CDCAckId.build(importerId).marshal();
- ackCache.put(ackId,
Collections.singletonList(Pair.of(channelProgressPair, new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
+ ackCache.put(ackId, Collections.singletonList(Pair.of(each, new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
sink.write(ackId, records);
}
}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportMetaDataExecutorTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportMetaDataExecutorTest.java
index 997c06b662d..7ddd7bf5dca 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportMetaDataExecutorTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ExportMetaDataExecutorTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
+import org.apache.commons.codec.binary.Base64;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
import
org.apache.shardingsphere.authority.rule.builder.DefaultAuthorityRuleConfigurationBuilder;
import
org.apache.shardingsphere.distsql.statement.ral.queryable.ExportMetaDataStatement;
@@ -41,11 +42,19 @@ import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUn
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import
org.apache.shardingsphere.proxy.backend.config.yaml.YamlProxyDataSourceConfiguration;
+import
org.apache.shardingsphere.proxy.backend.config.yaml.YamlProxyDatabaseConfiguration;
+import
org.apache.shardingsphere.proxy.backend.config.yaml.YamlProxyServerConfiguration;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import
org.apache.shardingsphere.proxy.backend.distsql.export.ExportedClusterInfo;
+import org.apache.shardingsphere.proxy.backend.distsql.export.ExportedMetaData;
import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
@@ -71,6 +80,10 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -79,10 +92,10 @@ import static org.mockito.Mockito.when;
@StaticMockSettings(ProxyContext.class)
class ExportMetaDataExecutorTest {
- private static final String EXPECTED_METADATA_VALUE =
"eyJtZXRhX2RhdGEiOnsiZGF0YWJhc2VzIjp7ImVtcHR5X21ldGFkYXRhIjoiZGF0YWJhc2VOYW1lOiBudWxsXG5kYXRhU291cmNlczpcbn"
+ private static final String EXPECTED_EMPTY_METADATA_VALUE =
"eyJtZXRhX2RhdGEiOnsiZGF0YWJhc2VzIjp7ImVtcHR5X21ldGFkYXRhIjoiZGF0YWJhc2VOYW1lOiBudWxsXG5kYXRhU291cmNlczpcbn"
+
"J1bGVzOlxuIn0sInByb3BzIjoiIiwicnVsZXMiOiJydWxlczpcbi0gIUdMT0JBTF9DTE9DS1xuICBlbmFibGVkOiBmYWxzZVxuICBwcm92aWRlcjogbG9jYWxcbiAgdHlwZTogVFNPXG4ifX0=";
- private static final String EXPECTED_EXPORT_METADATA_CONFIGURATION =
"eyJtZXRhX2RhdGEiOnsiZGF0YWJhc2VzIjp7Im5vcm1hbF9kYiI6ImRhdGFiYXNlTmFtZTogbm9ybWFsX2RiXG5kYXRhU291cm"
+ private static final String EXPECTED_NOT_EMPTY_METADATA_VALUE =
"eyJtZXRhX2RhdGEiOnsiZGF0YWJhc2VzIjp7Im5vcm1hbF9kYiI6ImRhdGFiYXNlTmFtZTogbm9ybWFsX2RiXG5kYXRhU291cm"
+
"NlczpcbiAgZHNfMDpcbiAgICBwYXNzd29yZDogXG4gICAgdXJsOiBqZGJjOm9wZW5nYXVzczovLzEyNy4wLjAuMTo1NDMyL2RlbW9fZHNfMFxuICAgIHVzZXJuYW1lOiByb290XG4gICAgbWluUG9vb"
+
"FNpemU6IDFcbiAgICBtYXhQb29sU2l6ZTogNTBcbiAgZHNfMTpcbiAgICBwYXNzd29yZDogXG4gICAgdXJsOiBqZGJjOm9wZW5nYXVzczovLzEyNy4wLjAuMTo1NDMyL2RlbW9fZHNfMVxuICAgIHVzZ"
+
"XJuYW1lOiByb290XG4gICAgbWluUG9vbFNpemU6IDFcbiAgICBtYXhQb29sU2l6ZTogNTBcbiJ9LCJwcm9wcyI6InByb3BzOlxuICBzcWwtc2hvdzogdHJ1ZVxuIiwicnVsZXMiOiJydWxlczpcbi0g"
@@ -119,7 +132,7 @@ class ExportMetaDataExecutorTest {
Collection<LocalDataQueryResultRow> actual = new
ExportMetaDataExecutor().getRows(contextManager.getMetaDataContexts().getMetaData(),
sqlStatement);
assertThat(actual.size(), is(1));
LocalDataQueryResultRow row = actual.iterator().next();
- assertThat(row.getCell(3), is(EXPECTED_METADATA_VALUE));
+ assertMetaData(row.getCell(3), EXPECTED_EMPTY_METADATA_VALUE);
}
private ContextManager mockEmptyContextManager() {
@@ -146,7 +159,7 @@ class ExportMetaDataExecutorTest {
Collection<LocalDataQueryResultRow> actual = new
ExportMetaDataExecutor().getRows(contextManager.getMetaDataContexts().getMetaData(),
new ExportMetaDataStatement(null));
assertThat(actual.size(), is(1));
LocalDataQueryResultRow row = actual.iterator().next();
- assertThat(row.getCell(3).toString(),
is(EXPECTED_EXPORT_METADATA_CONFIGURATION));
+ assertMetaData(row.getCell(3), EXPECTED_NOT_EMPTY_METADATA_VALUE);
}
private Map<String, StorageUnit> createStorageUnits() {
@@ -192,4 +205,101 @@ class ExportMetaDataExecutorTest {
result.setMinPoolSize(1);
return result;
}
+
+ private void assertMetaData(final Object actual, final String expected) {
+ assertNotNull(actual);
+ assertInstanceOf(String.class, actual);
+ assertMetaData(convertToExportedClusterInfo((String) actual),
convertToExportedClusterInfo(expected));
+ }
+
+ private void assertMetaData(final ExportedClusterInfo actual, final
ExportedClusterInfo expected) {
+ assertServerConfig(actual.getMetaData(), expected.getMetaData());
+ assertDatabaseConfig(actual.getMetaData().getDatabases(),
expected.getMetaData().getDatabases());
+ }
+
+ private void assertServerConfig(final ExportedMetaData actual, final
ExportedMetaData expected) {
+ if (null == expected) {
+ assertNull(actual);
+ return;
+ }
+ YamlProxyServerConfiguration actualServerConfig =
convertToYamlProxyServerConfig(actual.getRules() + System.lineSeparator() +
actual.getProps());
+ YamlProxyServerConfiguration expectedServerConfig =
convertToYamlProxyServerConfig(expected.getRules() + System.lineSeparator() +
expected.getProps());
+ if (null == expectedServerConfig) {
+ assertNull(actualServerConfig);
+ return;
+ }
+ assertRules(actualServerConfig.getRules(),
expectedServerConfig.getRules());
+ assertProps(actualServerConfig.getProps(),
expectedServerConfig.getProps());
+ }
+
+ private void assertRules(final Collection<YamlRuleConfiguration> actual,
final Collection<YamlRuleConfiguration> expected) {
+ if (null == expected) {
+ assertNull(actual);
+ return;
+ }
+ assertThat(actual.size(), is(expected.size()));
+ for (YamlRuleConfiguration each : expected) {
+ assertTrue(actual.stream().anyMatch(rule ->
rule.getRuleConfigurationType().equals(each.getRuleConfigurationType())));
+ }
+ }
+
+ private void assertProps(final Properties actual, final Properties
expected) {
+ if (null == expected) {
+ assertNull(actual);
+ return;
+ }
+ assertThat(actual.size(), is(expected.size()));
+ for (Entry<Object, Object> entry : expected.entrySet()) {
+ assertThat(actual.get(entry.getKey()), is(entry.getValue()));
+ }
+ }
+
+ private void assertDatabaseConfig(final Map<String, String> actual, final
Map<String, String> expected) {
+ assertThat(actual.size(), is(expected.size()));
+ for (Entry<String, String> entry : expected.entrySet()) {
+
assertDatabaseConfig(convertToYamlProxyDatabaseConfig(actual.get(entry.getKey())),
convertToYamlProxyDatabaseConfig(entry.getValue()));
+ }
+ }
+
+ private void assertDatabaseConfig(final YamlProxyDatabaseConfiguration
actual, final YamlProxyDatabaseConfiguration expected) {
+ assertThat(actual.getDatabaseName(), is(expected.getDatabaseName()));
+ assertThat(actual.getSchemaName(), is(expected.getSchemaName()));
+ assertDataSources(actual.getDataSources(), expected.getDataSources());
+ assertRules(actual.getRules(), expected.getRules());
+ }
+
+ private void assertDataSources(final Map<String,
YamlProxyDataSourceConfiguration> actual, final Map<String,
YamlProxyDataSourceConfiguration> expected) {
+ if (null == expected) {
+ assertNull(actual);
+ return;
+ }
+ assertThat(actual.size(), is(expected.size()));
+ for (Entry<String, YamlProxyDataSourceConfiguration> entry :
expected.entrySet()) {
+ YamlProxyDataSourceConfiguration actualDataSourceConfig =
actual.get(entry.getKey());
+ YamlProxyDataSourceConfiguration exceptedDataSourceConfig =
entry.getValue();
+ assertThat(actualDataSourceConfig.getDataSourceClassName(),
is(exceptedDataSourceConfig.getDataSourceClassName()));
+ assertThat(actualDataSourceConfig.getUrl(),
is(exceptedDataSourceConfig.getUrl()));
+ assertThat(actualDataSourceConfig.getUsername(),
is(exceptedDataSourceConfig.getUsername()));
+ assertThat(actualDataSourceConfig.getPassword(),
is(exceptedDataSourceConfig.getPassword()));
+
assertThat(actualDataSourceConfig.getConnectionTimeoutMilliseconds(),
is(exceptedDataSourceConfig.getConnectionTimeoutMilliseconds()));
+ assertThat(actualDataSourceConfig.getIdleTimeoutMilliseconds(),
is(exceptedDataSourceConfig.getIdleTimeoutMilliseconds()));
+ assertThat(actualDataSourceConfig.getMaxLifetimeMilliseconds(),
is(exceptedDataSourceConfig.getMaxLifetimeMilliseconds()));
+ assertThat(actualDataSourceConfig.getMaxPoolSize(),
is(exceptedDataSourceConfig.getMaxPoolSize()));
+ assertThat(actualDataSourceConfig.getMinPoolSize(),
is(exceptedDataSourceConfig.getMinPoolSize()));
+ assertThat(actualDataSourceConfig.getReadOnly(),
is(exceptedDataSourceConfig.getReadOnly()));
+ assertProps(actualDataSourceConfig.getCustomPoolProps(),
exceptedDataSourceConfig.getCustomPoolProps());
+ }
+ }
+
+ private ExportedClusterInfo convertToExportedClusterInfo(final String
base64String) {
+ return JsonUtils.fromJsonString(new
String(Base64.decodeBase64(base64String)), ExportedClusterInfo.class);
+ }
+
+ private YamlProxyServerConfiguration convertToYamlProxyServerConfig(final
String serverConfig) {
+ return YamlEngine.unmarshal(serverConfig,
YamlProxyServerConfiguration.class);
+ }
+
+ private YamlProxyDatabaseConfiguration
convertToYamlProxyDatabaseConfig(final String databaseConfig) {
+ return YamlEngine.unmarshal(databaseConfig,
YamlProxyDatabaseConfiguration.class);
+ }
}