C0urante commented on a change in pull request #11572: URL: https://github.com/apache/kafka/pull/11572#discussion_r814970233
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java ########## @@ -57,15 +71,46 @@ private final Herder herder; private final List<ConnectorPluginInfo> connectorPlugins; - private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList( + static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList( VerifiableSourceConnector.class, VerifiableSinkConnector.class, MockConnector.class, MockSourceConnector.class, MockSinkConnector.class, SchemaSourceConnector.class ); + @SuppressWarnings("rawtypes") + static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Collections.singletonList( + PredicatedTransformation.class + ); + public ConnectorPluginsResource(Herder herder) { this.herder = herder; this.connectorPlugins = new ArrayList<>(); + + // TODO: improve once plugins are allowed to be added/removed during runtime. + for (PluginDesc<SinkConnector> plugin : herder.plugins().sinkConnectors()) { + if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { + connectorPlugins.add(new ConnectorPluginInfo(plugin)); + } + } + for (PluginDesc<SourceConnector> plugin : herder.plugins().sourceConnectors()) { + if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { + connectorPlugins.add(new ConnectorPluginInfo(plugin)); + } + } + for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) { + if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) { + connectorPlugins.add(new ConnectorPluginInfo(transform)); + } + } + for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) { + connectorPlugins.add(new ConnectorPluginInfo(predicate)); + } + for (PluginDesc<Converter> converter : herder.plugins().converters()) { + connectorPlugins.add(new ConnectorPluginInfo(converter)); + } + for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) { + connectorPlugins.add(new ConnectorPluginInfo(headerConverter)); + } Review comment: Now that we have separate `Plugins::sinkConnectors` and `Plugins::sourceConnectors` methods, we can abstract this a little, which should improve readability a bit and make it easier to extend for other plugin types in the future: ```suggestion static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES = Arrays.asList( VerifiableSinkConnector.class, MockSinkConnector.class ); static final List<Class<? extends SourceConnector>> SOURCE_CONNECTOR_EXCLUDES = Arrays.asList( VerifiableSourceConnector.class, MockSourceConnector.class, SchemaSourceConnector.class ); @SuppressWarnings({"unchecked", "rawtypes"}) static final List<Class<? extends Transformation<?>>> TRANSFORM_EXCLUDES = Collections.singletonList( (Class) PredicatedTransformation.class ); public ConnectorPluginsResource(Herder herder) { this.herder = herder; this.connectorPlugins = new ArrayList<>(); // TODO: improve once plugins are allowed to be added/removed during runtime. addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().transformations(), TRANSFORM_EXCLUDES); addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet()); addConnectorPlugins(herder.plugins().converters(), Collections.emptySet()); addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet()); } private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins, Collection<Class<? extends T>> excludes) { plugins.stream() .filter(p -> !excludes.contains(p.pluginClass())) .map(ConnectorPluginInfo::new) .forEach(connectorPlugins::add); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -750,4 +755,41 @@ private String trace(Throwable t) { return keys; } + @Override + public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { + List<ConfigKeyInfo> results = new ArrayList<>(); + ConfigDef configDefs; + try { + Plugins p = plugins(); + Object plugin = p.newPlugin(pluginName); + PluginType pluginType = PluginType.from(plugin.getClass()); + switch (pluginType) { + case SINK: + case SOURCE: + configDefs = ((Connector) plugin).config(); + break; + case CONVERTER: + configDefs = ((Converter) plugin).config(); + break; + case HEADER_CONVERTER: + configDefs = ((HeaderConverter) plugin).config(); + break; + case TRANSFORMATION: + configDefs = ((Transformation<?>) plugin).config(); + break; + case PREDICATE: + configDefs = ((Predicate<?>) plugin).config(); + break; + default: + throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); + } + } catch (ClassNotFoundException cnfe) { + throw new BadRequestException("Unknown plugin " + pluginName + "."); Review comment: Nit: a 404 here might make more sense ```suggestion throw new NotFoundException("Unknown plugin " + pluginName + "."); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -438,7 +457,8 @@ private PluginScanResult scanPluginPath( } private void addAllAliases() { - addAliases(connectors); + addAliases(sinkConnectors); + addAliases(sourceConnectors); Review comment: I think we may want to add aliases for all connectors at once, in order to account for the `PluginUtils::isAliasUnique` check used to guarantee that it's safe to add an alias for a plugin. If (unlikely but possible) there's a source and a sink connector that have the same simple name, this would cause the alias for the source to shadow the alias for the sink. I guess that's not the worst thing but it's a deviation from current behavior and we probably don't want to let people notice that new behavior and start building against it with the expectation that it's supported and won't change later. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java ########## @@ -901,6 +902,75 @@ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors( assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2"); } + @Test + public void testConnectorPluginConfig() throws Exception { + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) + .withConstructor( + Worker.class, + String.class, + String.class, + StatusBackingStore.class, + ConfigBackingStore.class, + ConnectorClientConfigOverridePolicy.class + ) + .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) + .addMockedMethod("generation") + .createMock(); + + EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> { + String name = (String) EasyMock.getCurrentArguments()[0]; + switch (name) { + case "sink": return new SampleSinkConnector(); + case "source": return new SampleSourceConnector(); + case "converter": return new SampleConverterWithHeaders(); + case "header-converter": return new SampleHeaderConverter(); + case "predicate": return new SamplePredicate(); + default: return new SampleTransformation<>(); + } + }).anyTimes(); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); + replayAll(); + + List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink"); + assertNotNull(sinkConnectorConfigs); + assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size()); + + List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source"); + assertNotNull(sourceConnectorConfigs); + assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size()); + + List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter"); + assertNotNull(converterConfigs); + assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size()); + + List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter"); + assertNotNull(headerConverterConfigs); + assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size()); + + List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate"); + assertNotNull(predicateConfigs); + assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size()); + + List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation"); + assertNotNull(transformationConfigs); + assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size()); + } + + @Test(expected = BadRequestException.class) + public void testGetConnectorConfigDefWithBadName() throws Exception { + String connName = "AnotherPlugin"; + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) + .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class, + ConnectorClientConfigOverridePolicy.class) + .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) + .addMockedMethod("generation") + .createMock(); + EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); + EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new ClassNotFoundException()); + replayAll(); + herder.connectorPluginConfig(connName); + } + Review comment: Should we add a case for an unsupported plugin type, like a REST extension? ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ########## @@ -81,38 +84,83 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest(RestClient.class) -@PowerMockIgnore("javax.management.*") public class ConnectorPluginsResourceTest { - private static Map<String, String> props; - private static Map<String, String> partialProps = new HashMap<>(); + private static final Map<String, String> PROPS; + private static final Map<String, String> PARTIAL_PROPS = new HashMap<>(); static { - partialProps.put("name", "test"); - partialProps.put("test.string.config", "testString"); - partialProps.put("test.int.config", "1"); - partialProps.put("test.list.config", "a,b"); - - props = new HashMap<>(partialProps); - props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName()); - props.put("plugin.path", "test.path"); + PARTIAL_PROPS.put("name", "test"); + PARTIAL_PROPS.put("test.string.config", "testString"); + PARTIAL_PROPS.put("test.int.config", "1"); + PARTIAL_PROPS.put("test.list.config", "a,b"); + + PROPS = new HashMap<>(PARTIAL_PROPS); + PROPS.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName()); + PROPS.put("plugin.path", null); Review comment: Do we need this line at all now? ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ########## @@ -339,44 +369,45 @@ public void testValidateConfigWithNonExistentName() { // simple name but different package. String customClassname = "com.custom.package." + ConnectorPluginsResourceTestConnector.class.getSimpleName(); - assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs(customClassname, props)); + assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs(customClassname, PROPS)); } @Test public void testValidateConfigWithNonExistentAlias() { - assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs("ConnectorPluginsTest", props)); + assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs("ConnectorPluginsTest", PROPS)); } @Test - public void testListConnectorPlugins() throws Exception { - expectPlugins(); - Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins()); - assertFalse(connectorPlugins.contains(newInfo(Connector.class, "0.0"))); - assertFalse(connectorPlugins.contains(newInfo(SourceConnector.class, "0.0"))); - assertFalse(connectorPlugins.contains(newInfo(SinkConnector.class, "0.0"))); - assertFalse(connectorPlugins.contains(newInfo(VerifiableSourceConnector.class))); - assertFalse(connectorPlugins.contains(newInfo(VerifiableSinkConnector.class))); - assertFalse(connectorPlugins.contains(newInfo(MockSourceConnector.class))); - assertFalse(connectorPlugins.contains(newInfo(MockSinkConnector.class))); - assertFalse(connectorPlugins.contains(newInfo(MockConnector.class))); - assertFalse(connectorPlugins.contains(newInfo(SchemaSourceConnector.class))); - assertTrue(connectorPlugins.contains(newInfo(ConnectorPluginsResourceTestConnector.class))); - PowerMock.verifyAll(); + public void testListConnectorPlugins() { + Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true)); + + List<Set<MockConnectorPluginDesc<?>>> allConnectorPlugins = Arrays.asList( + SINK_CONNECTOR_PLUGINS, + SOURCE_CONNECTOR_PLUGINS); + for (Set<MockConnectorPluginDesc<?>> plugins : allConnectorPlugins) { + for (MockConnectorPluginDesc<?> plugin : plugins) { + boolean contained = connectorPlugins.contains(newInfo(plugin)); + if ((plugin.type() != PluginType.SOURCE && plugin.type() != PluginType.SINK) || + (ConnectorPluginsResource.CONNECTOR_EXCLUDES.contains(plugin.pluginClass()))) { + assertFalse(contained); + } else { + assertTrue(contained); + } + } + } + verify(herder, atLeastOnce()).plugins(); Review comment: It'd be more powerful to do an assertion on the complete set of returned plugins, since that will only require one test run to discover all differences between the expected plugins and the actual ones: ```suggestion Set<Class<?>> excludes = Stream.of(ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES) .flatMap(Collection::stream) .collect(Collectors.toSet()); Set<ConnectorPluginInfo> expectedConnectorPlugins = Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS) .flatMap(Collection::stream) .filter(p -> !excludes.contains(p.pluginClass())) .map(ConnectorPluginsResourceTest::newInfo) .collect(Collectors.toSet()); Set<ConnectorPluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true)); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); verify(herder, atLeastOnce()).plugins(); ``` (This assumes we split out `CONNECTOR_EXCLUDES`, but the same general strategy should apply even if we don't). ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java ########## @@ -29,7 +29,7 @@ /** * @see VerifiableSinkTask */ -public class VerifiableSinkConnector extends SourceConnector { +public class VerifiableSinkConnector extends SinkConnector { Review comment: Ha, nice catch! ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java ########## @@ -901,6 +902,75 @@ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors( assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2"); } + @Test + public void testConnectorPluginConfig() throws Exception { + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) + .withConstructor( + Worker.class, + String.class, + String.class, + StatusBackingStore.class, + ConfigBackingStore.class, + ConnectorClientConfigOverridePolicy.class + ) + .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) + .addMockedMethod("generation") + .createMock(); + + EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> { + String name = (String) EasyMock.getCurrentArguments()[0]; + switch (name) { + case "sink": return new SampleSinkConnector(); + case "source": return new SampleSourceConnector(); + case "converter": return new SampleConverterWithHeaders(); + case "header-converter": return new SampleHeaderConverter(); + case "predicate": return new SamplePredicate(); + default: return new SampleTransformation<>(); + } + }).anyTimes(); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); + replayAll(); + + List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink"); + assertNotNull(sinkConnectorConfigs); + assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size()); + + List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source"); + assertNotNull(sourceConnectorConfigs); + assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size()); + + List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter"); + assertNotNull(converterConfigs); + assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size()); + + List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter"); + assertNotNull(headerConverterConfigs); + assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size()); + + List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate"); + assertNotNull(predicateConfigs); + assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size()); + + List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation"); + assertNotNull(transformationConfigs); + assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size()); + } + + @Test(expected = BadRequestException.class) Review comment: (If we choose to send a 404 when a plugin isn't found instead of a 400) ```suggestion @Test(expected = NotFoundException.class) ``` (Will also require adding an import for the `NotFoundException` class) ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ########## @@ -144,70 +192,63 @@ CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs); PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs); + } - List<Class<? extends Connector>> abstractConnectorClasses = asList( - Connector.class, - SourceConnector.class, - SinkConnector.class - ); - - List<Class<? extends Connector>> connectorClasses = asList( - VerifiableSourceConnector.class, - VerifiableSinkConnector.class, - MockSourceConnector.class, - MockSinkConnector.class, - MockConnector.class, - SchemaSourceConnector.class, - ConnectorPluginsResourceTestConnector.class - ); + private final Herder herder = mock(DistributedHerder.class); + private final Plugins plugins = mock(Plugins.class); + private ConnectorPluginsResource connectorPluginsResource; + @SuppressWarnings("rawtypes") + @Before + public void setUp() throws Exception { try { - for (Class<? extends Connector> klass : abstractConnectorClasses) { - MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass, "0.0.0"); - CONNECTOR_PLUGINS.add(pluginDesc); + for (Class<? extends SinkConnector> klass : sinkConnectorClasses) { + MockConnectorPluginDesc<? extends SinkConnector> pluginDesc = new MockConnectorPluginDesc<>(klass); + SINK_CONNECTOR_PLUGINS.add(pluginDesc); Review comment: This field (`SINK_CONNECTOR_PLUGINS`) and others like it are `static`, but this method is executed once per test. Should we populate them once in a `static` block instead of re-populating them every time we run a test? I know it's unlikely to have a performance impact but it might make the tests easier to read. Also, if we use a static initializer block, we could wrap `SINK_CONNECTOR_PLUGINS` (and other similar fields) with [`Collections::unmodifiableList`](https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#unmodifiableList-java.util.List-) in order to prevent accidental modifications of them across test cases. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ########## @@ -144,70 +192,63 @@ CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs); PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs); + } - List<Class<? extends Connector>> abstractConnectorClasses = asList( - Connector.class, - SourceConnector.class, - SinkConnector.class - ); - - List<Class<? extends Connector>> connectorClasses = asList( - VerifiableSourceConnector.class, - VerifiableSinkConnector.class, - MockSourceConnector.class, - MockSinkConnector.class, - MockConnector.class, - SchemaSourceConnector.class, - ConnectorPluginsResourceTestConnector.class - ); + private final Herder herder = mock(DistributedHerder.class); + private final Plugins plugins = mock(Plugins.class); + private ConnectorPluginsResource connectorPluginsResource; + @SuppressWarnings("rawtypes") + @Before + public void setUp() throws Exception { try { - for (Class<? extends Connector> klass : abstractConnectorClasses) { - MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass, "0.0.0"); - CONNECTOR_PLUGINS.add(pluginDesc); + for (Class<? extends SinkConnector> klass : sinkConnectorClasses) { + MockConnectorPluginDesc<? extends SinkConnector> pluginDesc = new MockConnectorPluginDesc<>(klass); + SINK_CONNECTOR_PLUGINS.add(pluginDesc); + } + for (Class<? extends SourceConnector> klass : sourceConnectorClasses) { + MockConnectorPluginDesc<? extends SourceConnector> pluginDesc = new MockConnectorPluginDesc<>(klass); + SOURCE_CONNECTOR_PLUGINS.add(pluginDesc); + } + for (Class<? extends Converter> klass : converterClasses) { + MockConnectorPluginDesc<? extends Converter> pluginDesc = new MockConnectorPluginDesc<>(klass); + CONVERTER_PLUGINS.add(pluginDesc); + } + for (Class<? extends HeaderConverter> klass : headerConverterClasses) { + MockConnectorPluginDesc<? extends HeaderConverter> pluginDesc = new MockConnectorPluginDesc<>(klass); + HEADER_CONVERTER_PLUGINS.add(pluginDesc); + } + for (Class<? extends Transformation> klass : transformationClasses) { + MockConnectorPluginDesc<? extends Transformation> pluginDesc = new MockConnectorPluginDesc<>(klass); + TRANSFORMATION_PLUGINS.add(pluginDesc); } - for (Class<? extends Connector> klass : connectorClasses) { - MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass); - CONNECTOR_PLUGINS.add(pluginDesc); + for (Class<? extends Predicate> klass : predicateClasses) { + MockConnectorPluginDesc<? extends Predicate> pluginDesc = new MockConnectorPluginDesc<>(klass); + PREDICATE_PLUGINS.add(pluginDesc); } } catch (Exception e) { throw new RuntimeException(e); } - } - - @Mock - private Herder herder; - @Mock - private Plugins plugins; - private ConnectorPluginsResource connectorPluginsResource; - - @Before - public void setUp() throws Exception { - PowerMock.mockStatic(RestClient.class, - RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class)); Review comment: Good catch, thanks for cleaning this up! ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ########## @@ -399,19 +430,59 @@ public void testConnectorPluginsIncludesTypeAndVersionInformation() throws Excep ); } - protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass, String version) { - return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass, version)); + @Test + public void testListAllPlugins() { + Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false)); + + List<Set<MockConnectorPluginDesc<?>>> allPlugins = Arrays.asList( + SINK_CONNECTOR_PLUGINS, + SOURCE_CONNECTOR_PLUGINS, + CONVERTER_PLUGINS, + HEADER_CONVERTER_PLUGINS, + TRANSFORMATION_PLUGINS, + PREDICATE_PLUGINS); + for (Set<MockConnectorPluginDesc<?>> plugins : allPlugins) { + for (MockConnectorPluginDesc<?> plugin : plugins) { + boolean contained = connectorPlugins.contains(newInfo(plugin)); + if ((ConnectorPluginsResource.CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) || + (ConnectorPluginsResource.TRANSFORM_EXCLUDES.contains(plugin.pluginClass()))) { + assertFalse(contained); + } else { + assertTrue(contained); + } + } + verify(herder, atLeastOnce()).plugins(); + } Review comment: Same thought w/r/t performing assertions on the complete set of returned plugins: ```suggestion Set<Class<?>> excludes = Stream.of( ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES, ConnectorPluginsResource.TRANSFORM_EXCLUDES ).flatMap(Collection::stream) .collect(Collectors.toSet()); Set<ConnectorPluginInfo> expectedConnectorPlugins = Stream.of( SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS, CONVERTER_PLUGINS, HEADER_CONVERTER_PLUGINS, TRANSFORMATION_PLUGINS, PREDICATE_PLUGINS ).flatMap(Collection::stream) .filter(p -> !excludes.contains(p.pluginClass())) .map(ConnectorPluginsResourceTest::newInfo) .collect(Collectors.toSet()); Set<ConnectorPluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false)); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); verify(herder, atLeastOnce()).plugins(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org