C0urante commented on code in PR #14704:
URL: https://github.com/apache/kafka/pull/14704#discussion_r1385398359
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect
connect, String[] extraAr
}
}
+ /**
+ * Parse a connector configuration file into a {@link
CreateConnectorRequest}. The file can have any one of the following formats:
+ * <ol>
+ * <li>A JSON file containing an Object with only String keys and
values that represent the connector configuration.</li>
+ * <li>A JSON file containing an Object that can be parsed directly
into a {@link CreateConnectorRequest}</li>
+ * <li>A valid Java Properties file (i.e. containing String key/value
pairs representing the connector configuration)</li>
+ * </ol>
+ * <p>
+ * Visible for testing.
+ *
+ * @param filePath the path of the connector configuration file
+ * @return the parsed connector configuration in the form of a {@link
CreateConnectorRequest}
+ */
+ CreateConnectorRequest parseConnectorConfigurationFile(String filePath)
throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ File connectorConfigurationFile = Paths.get(filePath).toFile();
+ try {
+ Map<String, String> connectorConfigs = objectMapper.readValue(
+ connectorConfigurationFile,
+ new TypeReference<Map<String, String>>() { });
+
+ if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+ throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ + "configuration");
+ }
+ return new
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs,
null);
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
a Map with String keys and values", filePath);
+ }
+
+ try {
+ CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile,
+ new TypeReference<CreateConnectorRequest>() { });
+ if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+ if
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
{
+ throw new ConnectException("Connector name configuration
in 'config' doesn't match the one specified in 'name'");
Review Comment:
Can we include the file name in the error message here, like we do in the
other two name-related exceptions?
##########
docs/connect.html:
##########
@@ -293,7 +293,7 @@ <h4><a id="connect_rest" href="#connect_rest">REST
API</a></h4>
<ul>
<li><code>GET /connectors</code> - return a list of active
connectors</li>
- <li><code>POST /connectors</code> - create a new connector; the
request body should be a JSON object containing a string <code>name</code>
field and an object <code>config</code> field with the connector configuration
parameters</li>
+ <li><code>POST /connectors</code> - create a new connector; the
request body should be a JSON object containing a string <code>name</code>
field and an object <code>config</code> field with the connector configuration
parameters. The JSON object may also optionally contain a string
<code>initial_state</code> field which can take the following values -
<code>STOPPED</code>, <code>PAUSED</code> and the default value of
<code>RUNNING</code></li>
Review Comment:
Nit: wording
```suggestion
<li><code>POST /connectors</code> - create a new connector; the
request body should be a JSON object containing a string <code>name</code>
field and an object <code>config</code> field with the connector configuration
parameters. The JSON object may also optionally contain a string
<code>initial_state</code> field which can take the following values -
<code>STOPPED</code>, <code>PAUSED</code> or <code>RUNNING</code>(the
default)</li>
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect
connect, String[] extraAr
}
}
+ /**
+ * Parse a connector configuration file into a {@link
CreateConnectorRequest}. The file can have any one of the following formats:
+ * <ol>
+ * <li>A JSON file containing an Object with only String keys and
values that represent the connector configuration.</li>
+ * <li>A JSON file containing an Object that can be parsed directly
into a {@link CreateConnectorRequest}</li>
+ * <li>A valid Java Properties file (i.e. containing String key/value
pairs representing the connector configuration)</li>
Review Comment:
Can we clarify that we attempt to parse in this order?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -61,23 +71,24 @@ public ConnectStandalone(String... args) {
@Override
protected String usage() {
- return "ConnectStandalone worker.properties [connector1.properties
connector2.properties ...]";
+ return "ConnectStandalone worker.properties [connector1.properties
connector2.json ...]";
}
@Override
protected void processExtraArgs(Herder herder, Connect connect, String[]
extraArgs) {
try {
- for (final String connectorPropsFile : extraArgs) {
- Map<String, String> connectorProps =
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+ for (final String connectorConfigFile : extraArgs) {
+ CreateConnectorRequest createConnectorRequest =
parseConnectorConfigurationFile(connectorConfigFile);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>((error, info) -> {
if (error != null)
- log.error("Failed to create connector for {}",
connectorPropsFile);
+ log.error("Failed to create connector for {}",
connectorConfigFile);
else
log.info("Created connector {}", info.result().name());
});
herder.putConnectorConfig(
- connectorProps.get(ConnectorConfig.NAME_CONFIG),
- connectorProps, false, cb);
+ createConnectorRequest.name(),
createConnectorRequest.config(),
+ createConnectorRequest.initialState() != null ?
createConnectorRequest.initialState().toTargetState() : null,
Review Comment:
Nit: we can move this null-safe conversion logic to the
`CreateConnectorRequest` class, maybe with a `targetState()` or
`initialTargetState()` method.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -145,7 +146,11 @@ public Response createConnector(final @Parameter(hidden =
true) @QueryParam("for
checkAndPutConnectorConfigName(name, configs);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>();
- herder.putConnectorConfig(name, configs, false, cb);
+ TargetState targetState = null;
+ if (createRequest.initialState() != null) {
+ targetState = createRequest.initialState().toTargetState();
+ }
Review Comment:
This is another place that would benefit from a
`CreateConnectorRequest::targetState` method.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectStandaloneTest {
+
+ private static final String CONNECTOR_NAME = "test-connector";
+ private static final Map<String, String> CONNECTOR_CONFIG = new
HashMap<>();
+ static {
+ CONNECTOR_CONFIG.put(NAME_CONFIG, CONNECTOR_NAME);
+ CONNECTOR_CONFIG.put("key1", "val1");
+ CONNECTOR_CONFIG.put("key2", "val2");
+ }
+
+ private final ConnectStandalone connectStandalone = new
ConnectStandalone();
+ private File connectorConfigurationFile;
+
+ @Before
+ public void setUp() throws IOException {
+ connectorConfigurationFile = TestUtils.tempFile();
+ }
+
+ @Test
+ public void testParseJavaPropertiesFile() throws Exception {
+ Properties properties = new Properties();
+ CONNECTOR_CONFIG.forEach(properties::setProperty);
+
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ properties.store(writer, null);
+ }
+
+ CreateConnectorRequest request =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ assertEquals(CONNECTOR_NAME, request.name());
+ assertEquals(CONNECTOR_CONFIG, request.config());
+ assertNull(request.initialState());
+ }
+
+ @Test
+ public void testParseJsonFileWithConnectorConfiguration() throws Exception
{
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ writer.write(new
ObjectMapper().writeValueAsString(CONNECTOR_CONFIG));
+ }
+
+ CreateConnectorRequest request =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ assertEquals(CONNECTOR_NAME, request.name());
+ assertEquals(CONNECTOR_CONFIG, request.config());
+ assertNull(request.initialState());
+ }
+
+ @Test
+ public void testParseJsonFileWithCreateConnectorRequest() throws Exception
{
Review Comment:
Worth adding a case for this format with no initial state as well?
##########
docs/connect.html:
##########
@@ -60,7 +60,7 @@ <h4><a id="connect_running" href="#connect_running">Running
Kafka Connect</a></h
<p>Starting with 2.3.0, client configuration overrides can be configured
individually per connector by using the prefixes
<code>producer.override.</code> and <code>consumer.override.</code> for Kafka
sources or Kafka sinks respectively. These overrides are included with the rest
of the connector's configuration properties.</p>
- <p>The remaining parameters are connector configuration files. You may
include as many as you want, but all will execute within the same process (on
different threads). You can also choose not to specify any connector
configuration files on the command line, and instead use the REST API to create
connectors at runtime after your standalone worker starts.</p>
+ <p>The remaining parameters are connector configuration files. Each file
may either be a Java Properties files or a JSON file containing an object with
the same structure as the request body of either the <code>POST
/connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code>
endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI
documentation</a>). You may include as many as you want, but all will execute
within the same process (on different threads). You can also choose not to
specify any connector configuration files on the command line, and instead use
the REST API to create connectors at runtime after your standalone worker
starts.</p>
Review Comment:
Nit: typo
```suggestion
<p>The remaining parameters are connector configuration files. Each file
may either be a Java Properties file or a JSON file containing an object with
the same structure as the request body of either the <code>POST
/connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code>
endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI
documentation</a>). You may include as many as you want, but all will execute
within the same process (on different threads). You can also choose not to
specify any connector configuration files on the command line, and instead use
the REST API to create connectors at runtime after your standalone worker
starts.</p>
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java:
##########
@@ -42,17 +47,47 @@ public Map<String, String> config() {
return config;
}
+ @JsonProperty
+ public InitialState initialState() {
+ return initialState;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateConnectorRequest that = (CreateConnectorRequest) o;
return Objects.equals(name, that.name) &&
- Objects.equals(config, that.config);
+ Objects.equals(config, that.config) &&
+ Objects.equals(initialState, that.initialState);
}
@Override
public int hashCode() {
- return Objects.hash(name, config);
+ return Objects.hash(name, config, initialState);
+ }
+
+ public enum InitialState {
+ RUNNING,
+ PAUSED,
+ STOPPED;
+
+ @JsonCreator
+ public static InitialState forValue(String value) {
+ return InitialState.valueOf(value.toUpperCase(Locale.ROOT));
+ }
+
+ public TargetState toTargetState() {
+ switch (this) {
+ case RUNNING:
+ return TargetState.STARTED;
+ case PAUSED:
+ return TargetState.PAUSED;
+ case STOPPED:
+ return TargetState.STOPPED;
+ default:
+ throw new IllegalArgumentException("Unknown initial
state");
Review Comment:
Can we include the state here?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -108,6 +108,19 @@ public interface Herder {
*/
void putConnectorConfig(String connName, Map<String, String> config,
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
+ /**
+ * Set the configuration for a connector, along with a target state
optionally. This supports creation and updating.
+ * @param connName name of the connector
+ * @param config the connector's configuration
+ * @param targetState the desired target state for the connector; may be
{@code null} if no target state change is desired. Note that the default
+ * target state is {@link TargetState#STARTED} if no
target state exists previously
+ * @param allowReplace if true, allow overwriting previous configs; if
false, throw {@link AlreadyExistsException}
+ * if a connector with the same name already exists
+ * @param callback callback to invoke when the configuration has been
written
+ */
+ void putConnectorConfig(String connName, Map<String, String> config,
TargetState targetState, boolean allowReplace,
Review Comment:
Why introduce a new method? My first instinct would've been to add a new
parameter to the existing `Herder::putConnectorConfig` method.
##########
docs/connect.html:
##########
@@ -41,7 +41,7 @@ <h4><a id="connect_running" href="#connect_running">Running
Kafka Connect</a></h
<p>In standalone mode all work is performed in a single process. This
configuration is simpler to setup and get started with and may be useful in
situations where only one worker makes sense (e.g. collecting log files), but
it does not benefit from some of the features of Kafka Connect such as fault
tolerance. You can start a standalone process with the following command:</p>
<pre class="brush: bash;">
-> bin/connect-standalone.sh config/connect-standalone.properties
[connector1.properties connector2.properties ...]</pre>
+> bin/connect-standalone.sh config/connect-standalone.properties
[connector1.properties connector2.json ...]</pre>
Review Comment:
Do we also want to update the
[usage](https://github.com/apache/kafka/blob/edc7e10a745c350ad1efa9e4866370dc8ea0e034/bin/connect-standalone.sh#L19)
for `connect-standalone.sh`?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect
connect, String[] extraAr
}
}
+ /**
+ * Parse a connector configuration file into a {@link
CreateConnectorRequest}. The file can have any one of the following formats:
+ * <ol>
+ * <li>A JSON file containing an Object with only String keys and
values that represent the connector configuration.</li>
+ * <li>A JSON file containing an Object that can be parsed directly
into a {@link CreateConnectorRequest}</li>
+ * <li>A valid Java Properties file (i.e. containing String key/value
pairs representing the connector configuration)</li>
+ * </ol>
+ * <p>
+ * Visible for testing.
+ *
+ * @param filePath the path of the connector configuration file
+ * @return the parsed connector configuration in the form of a {@link
CreateConnectorRequest}
+ */
+ CreateConnectorRequest parseConnectorConfigurationFile(String filePath)
throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ File connectorConfigurationFile = Paths.get(filePath).toFile();
+ try {
+ Map<String, String> connectorConfigs = objectMapper.readValue(
+ connectorConfigurationFile,
+ new TypeReference<Map<String, String>>() { });
+
+ if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+ throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ + "configuration");
+ }
+ return new
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs,
null);
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
a Map with String keys and values", filePath);
+ }
+
+ try {
+ CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile,
+ new TypeReference<CreateConnectorRequest>() { });
+ if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+ if
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
{
+ throw new ConnectException("Connector name configuration
in 'config' doesn't match the one specified in 'name'");
+ }
+ } else {
+ createConnectorRequest.config().put(NAME_CONFIG,
createConnectorRequest.name());
+ }
+ return createConnectorRequest;
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
an object of type {}",
+ filePath, CreateConnectorRequest.class.getSimpleName());
+ }
+
+ Map<String, String> connectorConfigs =
Utils.propsToStringMap(Utils.loadProps(filePath));
Review Comment:
This has the interesting behavior that files with this content:
```json
{
"name": "local-file-sink",
"other.key": "test",
"config": {
"name": "local-file-sink",
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "test.sink.txt",
"topics": "connect-test"
}
}
```
will fail to be parsed as `Map<String, String>`, then fail to be parsed as a
`CreateConnectorRequest`, and will finally be parsed as a Java properties file,
at which point this error message will be generated before startup is aborted:
> org.apache.kafka.connect.errors.ConnectException: Connector configuration
at 'connect-file-sink-wrapped.json' is missing the mandatory 'name'
configuration
IMO it'd be a bit friendlier to ignore unknown fields when attempting to
parse the file as a `CreateConnectorRequest`. Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]