tombentley commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817623175
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -127,10 +131,26 @@ public DelegatingClassLoader(List<String> pluginPaths) {
this(pluginPaths, DelegatingClassLoader.class.getClassLoader());
}
+ @SuppressWarnings("unchecked")
public Set<PluginDesc<Connector>> connectors() {
+ Set<PluginDesc<Connector>> connectors = new TreeSet<>();
+ for (PluginDesc<SinkConnector> sinkConnector : sinkConnectors) {
+ connectors.add((PluginDesc<Connector>) (PluginDesc<? extends
Connector>) sinkConnector);
+ }
+ for (PluginDesc<SourceConnector> sourceConnector : sourceConnectors) {
+ connectors.add((PluginDesc<Connector>) (PluginDesc<? extends
Connector>) sourceConnector);
+ }
return connectors;
Review comment:
```suggestion
Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set)
sinkConnectors);
connectors.addAll((Set) sourceConnectors);
return connectors;
```
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
return keys;
}
+ @Override
+ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
Review comment:
Should it really be called `connectorPluginConfig` when it handles other
plugins too?
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -17,30 +17,32 @@
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
import java.util.Objects;
public class ConnectorPluginInfo {
Review comment:
Should the name remain as `ConnectorPluginInfo` when it's no longer just
for connector plugins?
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
##########
@@ -17,22 +17,24 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Locale;
public enum PluginType {
SOURCE(SourceConnector.class),
SINK(SinkConnector.class),
- CONNECTOR(Connector.class),
Review comment:
This makes me wonder why CONNECTOR was ever in this enum. Do you know
@mimaison / @C0urante ?
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -100,21 +135,23 @@ public ConfigInfos validateConfigs(
@GET
@Path("/")
- public List<ConnectorPluginInfo> listConnectorPlugins() {
- return getConnectorPlugins();
- }
-
- // TODO: improve once plugins are allowed to be added/removed during
runtime.
- private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
- if (connectorPlugins.isEmpty()) {
- for (PluginDesc<Connector> plugin : herder.plugins().connectors())
{
- if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
- connectorPlugins.add(new ConnectorPluginInfo(plugin));
- }
+ public List<ConnectorPluginInfo>
listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly")
boolean connectorsOnly) {
+ synchronized (this) {
+ if (connectorsOnly) {
+ Set<String> types = new
HashSet<>(Arrays.asList(PluginType.SINK.toString(),
PluginType.SOURCE.toString()));
+ return
Collections.unmodifiableList(connectorPlugins.stream().filter(p ->
types.contains(p.type())).collect(Collectors.toList()));
Review comment:
Is it worth using `Set.contains` for this, rather than `.equals` and
`||`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]