Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante merged PR #6934: URL: https://github.com/apache/kafka/pull/6934 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on PR #6934: URL: https://github.com/apache/kafka/pull/6934#issuecomment-2102553283 Thank you @C0urante. I updated the description (please tell me if I need to add more) there. Do you want to me squash the fixups or you squash when merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1595365475 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -773,6 +773,41 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time"); } +@Test +public void testPatchConnectorConfig() throws Exception { +connect = connectBuilder.build(); +// start the clusters +connect.start(); + +connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, +"Initial group of workers did not start in time."); + +connect.kafka().createTopic(TOPIC_NAME); + +Map props = defaultSinkConnectorProps(TOPIC_NAME); +props.put("unaffected-key", "unaffected-value"); +props.put("to-be-deleted-key", "value"); +props.put(TASKS_MAX_CONFIG, "1"); + +Map patch = new HashMap<>(); +patch.put(TASKS_MAX_CONFIG, "2"); // this plays as a value to be changed +patch.put("to-be-added-key", "value"); +patch.put("to-be-deleted-key", null); + +connect.configureConnector(CONNECTOR_NAME, props); +connect.patchConnectorConfig(CONNECTOR_NAME, patch); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, +"connector and tasks did not start in time"); + +Map expectedConfig = new HashMap<>(props); +expectedConfig.put("name", CONNECTOR_NAME); +expectedConfig.put("to-be-added-key", "value"); +expectedConfig.put(TASKS_MAX_CONFIG, "2"); +expectedConfig.remove("to-be-deleted-key"); +assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config()); Review Comment: Thanks, great suggestion. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1595357027 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final Map connectorConfigPatch) throws Throwable { +FutureCallback> cb = new FutureCallback<>(); +herder.patchConnectorConfig(connector, connectorConfigPatch, cb); +Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", +"PATCH", headers, connectorConfigPatch, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); +return Response.ok().entity(createdInfo.result()).build(); Review Comment: Yes, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -773,6 +773,41 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time"); } +@Test +public void testPatchConnectorConfig() throws Exception { +connect = connectBuilder.build(); +// start the clusters +connect.start(); + +connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, +"Initial group of workers did not start in time."); + +connect.kafka().createTopic(TOPIC_NAME); + +Map props = defaultSinkConnectorProps(TOPIC_NAME); +props.put("unaffected-key", "unaffected-value"); +props.put("to-be-deleted-key", "value"); +props.put(TASKS_MAX_CONFIG, "1"); + +Map patch = new HashMap<>(); +patch.put(TASKS_MAX_CONFIG, "2"); // this plays as a value to be changed +patch.put("to-be-added-key", "value"); +patch.put("to-be-deleted-key", null); + +connect.configureConnector(CONNECTOR_NAME, props); +connect.patchConnectorConfig(CONNECTOR_NAME, patch); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, +"connector and tasks did not start in time"); + +Map expectedConfig = new HashMap<>(props); +expectedConfig.put("name", CONNECTOR_NAME); +expectedConfig.put("to-be-added-key", "value"); +expectedConfig.put(TASKS_MAX_CONFIG, "2"); +expectedConfig.remove("to-be-deleted-key"); +assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config()); Review Comment: I think it's possible for poor timing (which Jenkins is notorious for...) to create flakiness here. The connector and both of its tasks may be started, but it's possible that the worker we hit with this request won't have read the patched connector config from the config topic yet if it's not the leader of the cluster. As a quick hack, we could tweak the order of operations and rely on existing retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` to prevent this: 1. Configure connector with `tasks.max = 2` 2. Ensure connector is started and 2 tasks are running 3. Patch connector, including changing `tasks.max` to `3` 4. Ensure connector is started and 3 tasks are running 5. Perform the assertion on this line (i.e., that the connector config as reported by an arbitrary worker in the cluster matches the expected patch config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on PR #6934: URL: https://github.com/apache/kafka/pull/6934#issuecomment-2098991457 Thanks @ivanyu, this is really close. One other thing--can you update the description with a brief overview of the PR (probably enough to just mention the new endpoint and its behavior), and remove the italicized template? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -773,6 +773,41 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time"); } +@Test +public void testPatchConnectorConfig() throws Exception { +connect = connectBuilder.build(); +// start the clusters +connect.start(); + +connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, +"Initial group of workers did not start in time."); + +connect.kafka().createTopic(TOPIC_NAME); + +Map props = defaultSinkConnectorProps(TOPIC_NAME); +props.put("unaffected-key", "unaffected-value"); +props.put("to-be-deleted-key", "value"); +props.put(TASKS_MAX_CONFIG, "1"); + +Map patch = new HashMap<>(); +patch.put(TASKS_MAX_CONFIG, "2"); // this plays as a value to be changed +patch.put("to-be-added-key", "value"); +patch.put("to-be-deleted-key", null); + +connect.configureConnector(CONNECTOR_NAME, props); +connect.patchConnectorConfig(CONNECTOR_NAME, patch); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, +"connector and tasks did not start in time"); + +Map expectedConfig = new HashMap<>(props); +expectedConfig.put("name", CONNECTOR_NAME); +expectedConfig.put("to-be-added-key", "value"); +expectedConfig.put(TASKS_MAX_CONFIG, "2"); +expectedConfig.remove("to-be-deleted-key"); +assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config()); Review Comment: I think it's possible for poor timing (which Jenkins is notorious for...) to create flakiness here. The connector and both of its tasks may be started, but it's possible that the worker we hit with this request won't have read the patched connector config from the config topic yet. As a quick hack, we could tweak the order of operations and rely on existing retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` to prevent this: 1. Configure connector with `tasks.max = 2` 2. Ensure connector is started and 2 tasks are running 3. Patch connector, and change `tasks.max` to `3` 4. Ensure connector is started and 3 tasks are running 5. Perform the assertion on this line (i.e., that the connector config as reported by an arbitrary worker in the cluster matches the expected patch config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -773,6 +773,41 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time"); } +@Test +public void testPatchConnectorConfig() throws Exception { +connect = connectBuilder.build(); +// start the clusters +connect.start(); + +connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, +"Initial group of workers did not start in time."); + +connect.kafka().createTopic(TOPIC_NAME); + +Map props = defaultSinkConnectorProps(TOPIC_NAME); +props.put("unaffected-key", "unaffected-value"); +props.put("to-be-deleted-key", "value"); +props.put(TASKS_MAX_CONFIG, "1"); + +Map patch = new HashMap<>(); +patch.put(TASKS_MAX_CONFIG, "2"); // this plays as a value to be changed +patch.put("to-be-added-key", "value"); +patch.put("to-be-deleted-key", null); + +connect.configureConnector(CONNECTOR_NAME, props); +connect.patchConnectorConfig(CONNECTOR_NAME, patch); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, +"connector and tasks did not start in time"); + +Map expectedConfig = new HashMap<>(props); +expectedConfig.put("name", CONNECTOR_NAME); +expectedConfig.put("to-be-added-key", "value"); +expectedConfig.put(TASKS_MAX_CONFIG, "2"); +expectedConfig.remove("to-be-deleted-key"); +assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config()); Review Comment: I think it's possible for poor timing (which Jenkins is notorious for...) to create flakiness here. The connector and both of its tasks may be started, but it's possible that the worker we hit with this request won't have read the patched connector config from the config topic yet if it's not the leader of the cluster. As a quick hack, we could tweak the order of operations and rely on existing retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` to prevent this: 1. Configure connector with `tasks.max = 2` 2. Ensure connector is started and 2 tasks are running 3. Patch connector, and change `tasks.max` to `3` 4. Ensure connector is started and 3 tasks are running 5. Perform the assertion on this line (i.e., that the connector config as reported by an arbitrary worker in the cluster matches the expected patch config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1592847527 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final Map connectorConfigPatch) throws Throwable { +FutureCallback> cb = new FutureCallback<>(); +herder.patchConnectorConfig(connector, connectorConfigPatch, cb); +Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", +"PATCH", headers, connectorConfigPatch, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); +return Response.ok().entity(createdInfo.result()).build(); Review Comment: Yep, LGTM 👍 Can you add that to the KIP? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -2336,6 +2336,133 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } +@Test +public void testPatchConnectorConfigNotFound() { +when(member.memberId()).thenReturn("leader"); +expectRebalance(0, Collections.emptyList(), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +ClusterConfigState clusterConfigState = new ClusterConfigState( +0, +null, +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(clusterConfigState); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(NotFoundException.class, exception.getCause()); +} + +@Test +public void testPatchConnectorConfigNotALeader() { +when(member.memberId()).thenReturn("not-leader"); + +// The connector is pre-existing due to the mocks. +ClusterConfigState originalSnapshot = new ClusterConfigState( +1, +null, +Collections.singletonMap(CONN1, 0), +Collections.singletonMap(CONN1, CONN1_CONFIG), +Collections.singletonMap(CONN1, TargetState.STARTED), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(originalSnapshot); +when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + +// Patch the connector config. + +expectMemberEnsureActive(); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException fencingException = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(ConnectException.class, fencingException.getCause()); +} + +@Test +public void testPatchConnectorConfig() throws Exception { +when(member.memberId()).thenReturn("leader"); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +Map originalConnConfig = new HashMap<>(CONN1_CONFIG); +originalConnConfig.put("foo0", "unaffected"); +originalConnConfig.put("foo1", "will-be-changed"); +originalConnConfig.put("foo2", "will-be-removed"); + +// The connector is pre-existing due to the mocks. + +ClusterConfigState originalSnapshot = new Cl
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1592838802 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, final Map ); } +@Override +public void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { +log.trace("Submitting connector config patch request {}", connName); +addRequest(() -> { Review Comment: Yep, looks great! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on PR #6934: URL: https://github.com/apache/kafka/pull/6934#issuecomment-2092245514 I also added the integration test as you proposed @C0urante -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1588022076 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, final Map ); } +@Override +public void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { +log.trace("Submitting connector config patch request {}", connName); +addRequest(() -> { +ConnectorInfo connectorInfo = connectorInfo(connName); +if (connectorInfo == null) { +callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); +} else { +Map patchedConfig = applyConnectorConfigPatch(connectorInfo.config(), configPatch); +putConnectorConfig(connName, patchedConfig, true, callback); Review Comment: Yes, makes sense, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587989524 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, final Map ); } +@Override +public void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { +log.trace("Submitting connector config patch request {}", connName); +addRequest(() -> { Review Comment: I added this check + a test for this (does it make sense to have it?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587970461 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } +@Test +public void testPatchConnectorConfigNotFound() { +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +Callback> patchCallback = mock(Callback.class); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + +ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); +verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); +assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); +assertNull(exceptionCaptor.getValue().getCause()); +} + +@Test +public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { +// Create the connector. +Map originalConnConfig = connectorConfig(SourceSink.SOURCE); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "changed"); +connConfigPatch.put("foo2", null); +connConfigPatch.put("foo3", "added"); + +Map patchedConnConfig = new HashMap<>(originalConnConfig); +patchedConnConfig.put("foo1", "changed"); +patchedConnConfig.remove("foo2"); +patchedConnConfig.put("foo3", "added"); + +expectAdd(SourceSink.SOURCE); +Connector connectorMock = mock(SourceConnector.class); +expectConfigValidation(SourceSink.SOURCE, originalConnConfig, patchedConnConfig); + +expectConnectorStartingWithoutTasks(originalConnConfig); + +herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, createCallback); +createCallback.get(1000L, TimeUnit.SECONDS); + +expectConnectorStartingWithoutTasks(patchedConnConfig); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + +Map returnedConfig = patchCallback.get(1000L, TimeUnit.SECONDS).result().config(); +assertEquals(patchedConnConfig, returnedConfig); + +// Also check the returned config when requested. +FutureCallback> configCallback = new FutureCallback<>(); +herder.connectorConfig(CONNECTOR_NAME, configCallback); + +Map returnedConfig2 = configCallback.get(1000L, TimeUnit.SECONDS); +assertEquals(patchedConnConfig, returnedConfig2); +} + +private void expectConnectorStartingWithoutTasks(Map config) { Review Comment: Thanks, did the latter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587966298 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } +@Test +public void testPatchConnectorConfigNotFound() { +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +Callback> patchCallback = mock(Callback.class); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + +ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); +verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); +assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); +assertNull(exceptionCaptor.getValue().getCause()); +} + +@Test +public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { +// Create the connector. +Map originalConnConfig = connectorConfig(SourceSink.SOURCE); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "changed"); +connConfigPatch.put("foo2", null); +connConfigPatch.put("foo3", "added"); + +Map patchedConnConfig = new HashMap<>(originalConnConfig); +patchedConnConfig.put("foo1", "changed"); +patchedConnConfig.remove("foo2"); +patchedConnConfig.put("foo3", "added"); + +expectAdd(SourceSink.SOURCE); +Connector connectorMock = mock(SourceConnector.class); Review Comment: Yeah, a rebase artifact, deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587963241 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } +@Test +public void testPatchConnectorConfigNotFound() { +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +Callback> patchCallback = mock(Callback.class); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + +ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); +verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); +assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); +assertNull(exceptionCaptor.getValue().getCause()); Review Comment: I think, none. Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587958141 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } +@Test +public void testPatchConnectorConfigNotFound() { +ClusterConfigState clusterConfigState = new ClusterConfigState( +0, +null, +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(clusterConfigState); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(NotFoundException.class, exception.getCause()); +} + +@Test +public void testPatchConnectorConfig() throws Exception { +when(member.memberId()).thenReturn("leader"); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +Map originalConnConfig = new HashMap<>(CONN1_CONFIG); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); + +// The connector is pre-existing due to the mocks. + +ClusterConfigState originalSnapshot = new ClusterConfigState( +1, +null, +Collections.singletonMap(CONN1, 0), +Collections.singletonMap(CONN1, originalConnConfig), +Collections.singletonMap(CONN1, TargetState.STARTED), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(originalSnapshot); +when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + +expectMemberPoll(); + +// Patch the connector config. +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "changed"); +connConfigPatch.put("foo2", null); +connConfigPatch.put("foo3", "added"); + +Map patchedConnConfig = new HashMap<>(originalConnConfig); +patchedConnConfig.put("foo1", "changed"); +patchedConnConfig.remove("foo2"); +patchedConnConfig.put("foo3", "added"); + +expectMemberEnsureActive(); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); +when(worker.getPlugins()).thenReturn(plugins); Review Comment: Thanks, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587955412 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } +@Test +public void testPatchConnectorConfigNotFound() { +ClusterConfigState clusterConfigState = new ClusterConfigState( +0, +null, +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(clusterConfigState); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(NotFoundException.class, exception.getCause()); +} + +@Test +public void testPatchConnectorConfig() throws Exception { +when(member.memberId()).thenReturn("leader"); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +Map originalConnConfig = new HashMap<>(CONN1_CONFIG); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); + +// The connector is pre-existing due to the mocks. + +ClusterConfigState originalSnapshot = new ClusterConfigState( +1, +null, +Collections.singletonMap(CONN1, 0), +Collections.singletonMap(CONN1, originalConnConfig), +Collections.singletonMap(CONN1, TargetState.STARTED), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(originalSnapshot); +when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + +expectMemberPoll(); + +// Patch the connector config. +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "changed"); +connConfigPatch.put("foo2", null); +connConfigPatch.put("foo3", "added"); + +Map patchedConnConfig = new HashMap<>(originalConnConfig); +patchedConnConfig.put("foo1", "changed"); +patchedConnConfig.remove("foo2"); +patchedConnConfig.put("foo3", "added"); + +expectMemberEnsureActive(); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); +when(worker.getPlugins()).thenReturn(plugins); + +ArgumentCaptor> validateCallback = ArgumentCaptor.forClass(Callback.class); +doAnswer(invocation -> { +validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS); +return null; +}).when(herder).validateConnectorConfig(eq(patchedConnConfig), validateCallback.capture()); + +// This is effectively the main check of this test: +// we validate that what's written in the config storage is the patched config. +doNothing().when(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull()); Review Comment: Yes, makes sense, applied -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587938094 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } +@Test +public void testPatchConnectorConfigNotFound() { +ClusterConfigState clusterConfigState = new ClusterConfigState( +0, +null, +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(clusterConfigState); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(NotFoundException.class, exception.getCause()); +} + +@Test +public void testPatchConnectorConfig() throws Exception { +when(member.memberId()).thenReturn("leader"); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +Map originalConnConfig = new HashMap<>(CONN1_CONFIG); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); Review Comment: Yes, good idea. Done here and in `StandaloneHerderTest` too. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } +@Test +public void testPatchConnectorConfigNotFound() { +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +Callback> patchCallback = mock(Callback.class); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + +ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); +verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); +assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); +assertNull(exceptionCaptor.getValue().getCause()); +} + +@Test +public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { +// Create the connector. +Map originalConnConfig = connectorConfig(SourceSink.SOURCE); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587929296 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String connName, } } +@Override +public void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { +try { +ConnectorInfo connectorInfo = connectorInfo(connName); +if (connectorInfo == null) { +callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); +return; +} + +Map patchedConfig = applyConnectorConfigPatch(connectorInfo.config(), configPatch); +validateConnectorConfig(patchedConfig, (error, configInfos) -> { +if (error != null) { +callback.onCompletion(error, null); +return; +} + +requestExecutorService.submit( +() -> putConnectorConfig(connName, patchedConfig, null, true, callback, configInfos) +); +}); +} catch (ConnectException e) { Review Comment: Ack, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587926661 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String connName, } } +@Override +public void patchConnectorConfig(String connName, Map configPatch, Callback> callback) { Review Comment: Yes, makes sense. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587918212 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, Review Comment: Thanks, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587904583 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final Map connectorConfigPatch) throws Throwable { +FutureCallback> cb = new FutureCallback<>(); +herder.patchConnectorConfig(connector, connectorConfigPatch, cb); +Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", +"PATCH", headers, connectorConfigPatch, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); +return Response.ok().entity(createdInfo.result()).build(); Review Comment: I didn't find the current `PUT` response documentation, so I propose something like this: ``` Responses follow the model of the configuration PUT endpoint: 1. If the patch was successfully applied, the response code is 200 and the body is a JSON object with the updated connector information (`name`, `type`, `config`, and `tasks`), for example: { "name": "my-connector", "config": { "name": "my-connector", "sample_config_2": "test_config_2", "sample_config": "test_config_new" }, "tasks": [ { "connector": "my-connector", "task": 0 }, { "connector": "my-connector", "task": 1 } ], "type": "sink" } 2. In case of errors, the response code matches the error type (e.g. 400 in case of a config validation error; 404 if the connector is not found; 500 in case of other server-side errors) and the body is a JSON object with the error details: { "error_code": 400, "message": "Connector configuration is invalid and contains the following 1 error(s):\n...\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" } ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
ivanyu commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1587717243 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -992,4 +992,15 @@ public List setWorkerLoggerLevel(String namespace, String desiredLevelSt return loggers.setLevel(namespace, level); } +protected Map applyConnectorConfigPatch(Map currentConfig, Map configPatch) { +Map patchedConfig = new HashMap<>(currentConfig); +configPatch.forEach((k, v) -> { +if (v != null) { +patchedConfig.put(k, v); +} else { +patchedConfig.remove(k); +} +}); +return patchedConfig; +} Review Comment: I did this, extracted into `ConnectUtils` (because I think the `null` processing logic may be connect-specific) + added a unit test for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final Map connectorConfigPatch) throws Throwable { +FutureCallback> cb = new FutureCallback<>(); +herder.patchConnectorConfig(connector, connectorConfigPatch, cb); +Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", +"PATCH", headers, connectorConfigPatch, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); +return Response.ok().entity(createdInfo.result()).build(); Review Comment: Just realizing now that we don't actually specify the status and body of the REST response in the KIP. I agree with what's here, especially since it matches the existing `PUT /connectors/{name}/config` endpoint, but it's worth specifying in the KIP for completeness. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } +@Test +public void testPatchConnectorConfigNotFound() { +ClusterConfigState clusterConfigState = new ClusterConfigState( +0, +null, +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(clusterConfigState); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(NotFoundException.class, exception.getCause()); +} + +@Test +public void testPatchConnectorConfig() throws Exception { +when(member.memberId()).thenReturn("leader"); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +Map originalConnConfig = new HashMap<>(CONN1_CONFIG); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); Review Comment: Nit: can we add one more key/value pair that should be unchanged after the patch is applied? This would catch bugs where the set of post-patch keys is derived from the patch instead of the combination of the patch and the prior configuration. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, Review Comment: Nit: ```suggestion final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } +@Test +public void testPatchConnectorConfigNotFound() { +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +Callback> patchCallback = mock(Callback.class); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);