chia7712 commented on code in PR #17019:
URL: https://github.com/apache/kafka/pull/17019#discussion_r1736119443
##########
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##########
@@ -163,82 +174,135 @@ private Store getStore(ExtensionContext context) {
return context.getStore(Namespace.create(context.getUniqueId()));
}
+ private int getTestRepeatCount() {
+ int count;
+ try {
+ String repeatCount =
System.getProperty(CLUSTER_TEST_REPEAT_SYSTEM_PROP, "1");
+ count = Integer.parseInt(repeatCount);
+ } catch (NumberFormatException e) {
+ count = 1;
+ }
+ return count;
+ }
+
List<TestTemplateInvocationContext>
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
if (annot.value().trim().isEmpty()) {
throw new IllegalStateException("ClusterTemplate value can't be
empty string.");
}
String baseDisplayName = context.getRequiredTestMethod().getName();
- List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(context, annot.value())
- .stream().flatMap(config -> config.clusterTypes().stream()
- .map(type -> type.invocationContexts(baseDisplayName,
config))).collect(Collectors.toList());
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(repeatCount, context, annot.value())
+ .stream()
+ .flatMap(config -> config.clusterTypes().stream().map(type ->
type.invocationContexts(baseDisplayName, config)))
+ .collect(Collectors.toList());
if (contexts.isEmpty()) {
throw new IllegalStateException("ClusterConfig generator method
should provide at least one config");
}
- return contexts;
+ return repeatTestContexts(contexts);
}
@SuppressWarnings("unchecked")
- private List<ClusterConfig> generateClusterConfigurations(ExtensionContext
context, String generateClustersMethods) {
+ private List<ClusterConfig> generateClusterConfigurations(
+ int repeatCount,
+ ExtensionContext context,
+ String generateClustersMethods
+ ) {
Object testInstance = context.getTestInstance().orElse(null);
Method method =
ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(),
generateClustersMethods);
- return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method,
testInstance);
+ List<ClusterConfig> configs = new ArrayList<>();
+ for (int i = 0; i < repeatCount; i++) {
+ configs.addAll((List<ClusterConfig>)
ReflectionUtils.invokeMethod(method, testInstance));
+ }
+ return configs;
}
- private List<TestTemplateInvocationContext>
processClusterTests(ExtensionContext context, ClusterTests annots,
ClusterTestDefaults defaults) {
-
- List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
- .flatMap(annot -> processClusterTestInternal(context, annot,
defaults).stream()).collect(Collectors.toList());
+ private List<TestTemplateInvocationContext> processClusterTests(
+ ExtensionContext context,
+ ClusterTest[] clusterTests,
+ ClusterTestDefaults defaults
+ ) {
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> ret =
repeatedClusterTests(repeatCount, clusterTests)
Review Comment:
We can leverage `IntStream` and `flatMap`. for example:
```java
List<TestTemplateInvocationContext> ret = IntStream.range(0,
repeatCount)
.mapToObj(ignored -> Arrays.stream(clusterTests))
.flatMap(Function.identity())
.flatMap(clusterTest -> processClusterTestInternal(context,
clusterTest, defaults).stream())
.collect(Collectors.toList());
```
##########
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##########
@@ -163,82 +174,135 @@ private Store getStore(ExtensionContext context) {
return context.getStore(Namespace.create(context.getUniqueId()));
}
+ private int getTestRepeatCount() {
+ int count;
+ try {
+ String repeatCount =
System.getProperty(CLUSTER_TEST_REPEAT_SYSTEM_PROP, "1");
+ count = Integer.parseInt(repeatCount);
+ } catch (NumberFormatException e) {
+ count = 1;
+ }
+ return count;
+ }
+
List<TestTemplateInvocationContext>
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
if (annot.value().trim().isEmpty()) {
throw new IllegalStateException("ClusterTemplate value can't be
empty string.");
}
String baseDisplayName = context.getRequiredTestMethod().getName();
- List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(context, annot.value())
- .stream().flatMap(config -> config.clusterTypes().stream()
- .map(type -> type.invocationContexts(baseDisplayName,
config))).collect(Collectors.toList());
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(repeatCount, context, annot.value())
+ .stream()
+ .flatMap(config -> config.clusterTypes().stream().map(type ->
type.invocationContexts(baseDisplayName, config)))
+ .collect(Collectors.toList());
if (contexts.isEmpty()) {
throw new IllegalStateException("ClusterConfig generator method
should provide at least one config");
}
- return contexts;
+ return repeatTestContexts(contexts);
}
@SuppressWarnings("unchecked")
- private List<ClusterConfig> generateClusterConfigurations(ExtensionContext
context, String generateClustersMethods) {
+ private List<ClusterConfig> generateClusterConfigurations(
+ int repeatCount,
+ ExtensionContext context,
+ String generateClustersMethods
+ ) {
Object testInstance = context.getTestInstance().orElse(null);
Method method =
ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(),
generateClustersMethods);
- return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method,
testInstance);
+ List<ClusterConfig> configs = new ArrayList<>();
+ for (int i = 0; i < repeatCount; i++) {
+ configs.addAll((List<ClusterConfig>)
ReflectionUtils.invokeMethod(method, testInstance));
+ }
+ return configs;
}
- private List<TestTemplateInvocationContext>
processClusterTests(ExtensionContext context, ClusterTests annots,
ClusterTestDefaults defaults) {
-
- List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
- .flatMap(annot -> processClusterTestInternal(context, annot,
defaults).stream()).collect(Collectors.toList());
+ private List<TestTemplateInvocationContext> processClusterTests(
+ ExtensionContext context,
+ ClusterTest[] clusterTests,
+ ClusterTestDefaults defaults
+ ) {
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> ret =
repeatedClusterTests(repeatCount, clusterTests)
+ .flatMap(clusterTest -> processClusterTestInternal(context,
clusterTest, defaults).stream())
+ .collect(Collectors.toList());
if (ret.isEmpty()) {
throw new IllegalStateException("processClusterTests method should
provide at least one config");
}
- return ret;
+ return repeatTestContexts(ret);
}
- private List<TestTemplateInvocationContext>
processClusterTest(ExtensionContext context, ClusterTest annot,
ClusterTestDefaults defaults) {
- List<TestTemplateInvocationContext> ret =
processClusterTestInternal(context, annot, defaults);
+ private List<TestTemplateInvocationContext> processClusterTestInternal(
+ ExtensionContext context,
+ ClusterTest clusterTest,
+ ClusterTestDefaults defaults
+ ) {
+ Type[] types = clusterTest.types().length == 0 ? defaults.types() :
clusterTest.types();
+ Map<String, String> serverProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(clusterTest.serverProperties()))
+ .filter(e -> e.id() == -1)
+ .collect(Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b));
- if (ret.isEmpty()) {
- throw new IllegalStateException("processClusterTest method should
provide at least one config");
- }
+ Map<Integer, Map<String, String>> perServerProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(clusterTest.serverProperties()))
+ .filter(e -> e.id() != -1)
+ .collect(Collectors.groupingBy(ClusterConfigProperty::id,
Collectors.mapping(Function.identity(),
+ Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b))));
- return ret;
+ Map<Features, Short> features = Arrays.stream(clusterTest.features())
+ .collect(Collectors.toMap(ClusterFeature::feature,
ClusterFeature::version));
+
+ ClusterConfig config = ClusterConfig.builder()
+ .setTypes(new HashSet<>(Arrays.asList(types)))
+ .setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() :
clusterTest.brokers())
+ .setControllers(clusterTest.controllers() == 0 ?
defaults.controllers() : clusterTest.controllers())
+ .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ?
defaults.disksPerBroker() : clusterTest.disksPerBroker())
+ .setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ?
defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
+ .setListenerName(clusterTest.listener().trim().isEmpty() ? null :
clusterTest.listener())
+ .setServerProperties(serverProperties)
+ .setPerServerProperties(perServerProperties)
+ .setSecurityProtocol(clusterTest.securityProtocol())
+ .setMetadataVersion(clusterTest.metadataVersion())
+ .setTags(Arrays.asList(clusterTest.tags()))
+ .setFeatures(features)
+ .build();
+
+ return Arrays.stream(types)
+ .map(type ->
type.invocationContexts(context.getRequiredTestMethod().getName(), config))
+ .collect(Collectors.toList());
}
- private List<TestTemplateInvocationContext>
processClusterTestInternal(ExtensionContext context, ClusterTest annot,
ClusterTestDefaults defaults) {
- Type[] types = annot.types().length == 0 ? defaults.types() :
annot.types();
- Map<String, String> serverProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(annot.serverProperties()))
- .filter(e -> e.id() == -1)
- .collect(Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b));
- Map<Integer, Map<String, String>> perServerProperties =
Stream.concat(Arrays.stream(defaults.serverProperties()),
Arrays.stream(annot.serverProperties()))
- .filter(e -> e.id() != -1)
- .collect(Collectors.groupingBy(ClusterConfigProperty::id,
Collectors.mapping(Function.identity(),
- Collectors.toMap(ClusterConfigProperty::key,
ClusterConfigProperty::value, (a, b) -> b))));
+ Stream<ClusterTest> repeatedClusterTests(int repeatCount, ClusterTest...
clusterTestAnnots) {
Review Comment:
maybe we can use `ClusterTest[] clusterTests` instead?
##########
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##########
@@ -163,82 +174,135 @@ private Store getStore(ExtensionContext context) {
return context.getStore(Namespace.create(context.getUniqueId()));
}
+ private int getTestRepeatCount() {
+ int count;
+ try {
+ String repeatCount =
System.getProperty(CLUSTER_TEST_REPEAT_SYSTEM_PROP, "1");
+ count = Integer.parseInt(repeatCount);
+ } catch (NumberFormatException e) {
+ count = 1;
+ }
+ return count;
+ }
+
List<TestTemplateInvocationContext>
processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
if (annot.value().trim().isEmpty()) {
throw new IllegalStateException("ClusterTemplate value can't be
empty string.");
}
String baseDisplayName = context.getRequiredTestMethod().getName();
- List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(context, annot.value())
- .stream().flatMap(config -> config.clusterTypes().stream()
- .map(type -> type.invocationContexts(baseDisplayName,
config))).collect(Collectors.toList());
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> contexts =
generateClusterConfigurations(repeatCount, context, annot.value())
+ .stream()
+ .flatMap(config -> config.clusterTypes().stream().map(type ->
type.invocationContexts(baseDisplayName, config)))
+ .collect(Collectors.toList());
if (contexts.isEmpty()) {
throw new IllegalStateException("ClusterConfig generator method
should provide at least one config");
}
- return contexts;
+ return repeatTestContexts(contexts);
}
@SuppressWarnings("unchecked")
- private List<ClusterConfig> generateClusterConfigurations(ExtensionContext
context, String generateClustersMethods) {
+ private List<ClusterConfig> generateClusterConfigurations(
+ int repeatCount,
+ ExtensionContext context,
+ String generateClustersMethods
+ ) {
Object testInstance = context.getTestInstance().orElse(null);
Method method =
ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(),
generateClustersMethods);
- return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method,
testInstance);
+ List<ClusterConfig> configs = new ArrayList<>();
+ for (int i = 0; i < repeatCount; i++) {
+ configs.addAll((List<ClusterConfig>)
ReflectionUtils.invokeMethod(method, testInstance));
+ }
+ return configs;
}
- private List<TestTemplateInvocationContext>
processClusterTests(ExtensionContext context, ClusterTests annots,
ClusterTestDefaults defaults) {
-
- List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
- .flatMap(annot -> processClusterTestInternal(context, annot,
defaults).stream()).collect(Collectors.toList());
+ private List<TestTemplateInvocationContext> processClusterTests(
+ ExtensionContext context,
+ ClusterTest[] clusterTests,
+ ClusterTestDefaults defaults
+ ) {
+ int repeatCount = getTestRepeatCount();
+ List<TestTemplateInvocationContext> ret =
repeatedClusterTests(repeatCount, clusterTests)
+ .flatMap(clusterTest -> processClusterTestInternal(context,
clusterTest, defaults).stream())
+ .collect(Collectors.toList());
if (ret.isEmpty()) {
throw new IllegalStateException("processClusterTests method should
provide at least one config");
}
- return ret;
+ return repeatTestContexts(ret);
Review Comment:
why we need to "repeat" the test contexts again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]