[GitHub] [kafka] kkonstantine commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
kkonstantine commented on pull request #8654: URL: https://github.com/apache/kafka/pull/8654#issuecomment-631879517 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
kkonstantine commented on a change in pull request #8654: URL: https://github.com/apache/kafka/pull/8654#discussion_r428443080 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the creation of internal topics. + */ +@Category(IntegrationTest.class) +public class InternalTopicsIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); + +private EmbeddedConnectCluster.Builder connectBuilder; +private EmbeddedConnectCluster connect; +Map workerProps = new HashMap<>(); +Properties brokerProps = new Properties(); + +@Before +public void setup() { +// setup Kafka broker properties +brokerProps.put("auto.create.topics.enable", String.valueOf(false)); + +// build a Connect cluster backed by Kafka and Zk +connectBuilder = new EmbeddedConnectCluster.Builder() +.name("connect-cluster") +.numWorkers(1) +.numBrokers(1) +.brokerProps(brokerProps); +} + +@After +public void close() { +// stop all Connect, Kafka and Zk threads. +connect.stop(); +} + +@Test +public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedException { +int numWorkers = 1; +int numBrokers = 3; +connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") Review comment: Makes sense. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -453,11 +453,17 @@ public void putSessionKey(SessionKey sessionKey) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); -NewTopic topicDescription = TopicAdmin.defineTopic(topic). -compacted(). -partitions(1). - replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). -build(); + +Map topicSettings = null; +if (config instanceof DistributedConfig) { +topicSettings = ((DistributedConfig) config).configStorageTopicSettings(); +} Review comment: ```suggestion Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).configStorageTopicSettings() : Collections.emptyMap(); ``` I know that `TopicAdmin#defineTopic` checks for `null`, but I think using `null` with collections is better to do when such optimization matters. Wdyt? (btw you don't have to use the ternary operator, I just added it to make the suggestion clear). Also, if you change here, please change in the other files too. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ## @@ -400,6 +424,33 @@ public KeyGenerator getInternalRequestKeyGenerator() { } } +private Map topicSettings(String prefix) { +Map result = originalsWithPrefix(prefix); +if (CONFIG_STORAGE_PREFIX.equals(prefix) && result.containsKey(PARTITIONS_SUFFIX)) { +log.warn("Ignoring '{}{}={}' setting, since config topic partitions is always 1", prefix, PARTITIONS_SUFFIX, result.get("partitions")); +} +Object removedPolicy = result.remove("cleanup.policy"); Review comment: It's one more impor
[GitHub] [kafka] cmccabe opened a new pull request #8703: MINOR: add a getOrCreate function to KRPC collections
cmccabe opened a new pull request #8703: URL: https://github.com/apache/kafka/pull/8703 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pgwhalen commented on pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
pgwhalen commented on pull request #6824: URL: https://github.com/apache/kafka/pull/6824#issuecomment-631873644 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r428370254 ## File path: clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + */ +class BaseVersionRange { Review comment: Do we want to get a unit test class for `BaseVersionRange`? ## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param is the type of version range. + * @see SupportedVersionRange + * @see FinalizedVersionRange + */ +public class Features { +private final Map features; + +/** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ +private Features(Map features) { +this.features = features; +} + +/** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ +public static Features supportedFeatures(Map features) { +return new Features<>(features); +} + +/** + * @param features Map of feature name to FinalizedVersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ +public static Features finalizedFeatures(Map features) { +return new Features<>(features); +} + +// Visible for testing. +public static Features emptyFinalizedFeatures() { +return new Features<>(new HashMap<>()); +} + +public static Features emptySupportedFeatures() { +return new Features<>(new HashMap<>()); +} + +public Map features() { +return features; +} + +public boolean empty() { +return features.isEmpty(); +} + +/** +
[GitHub] [kafka] ijuma merged pull request #8472: KAFKA-9855 - return cached Structs for Schemas with no fields
ijuma merged pull request #8472: URL: https://github.com/apache/kafka/pull/8472 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8472: KAFKA-9855 - return cached Structs for Schemas with no fields
ijuma commented on pull request #8472: URL: https://github.com/apache/kafka/pull/8472#issuecomment-631870599 I have a WIP PR that removes structs altogether, but this is a good improvement in the meantime. Merging to trunk. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9950) MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9950: -- Fix Version/s: 2.5.1 2.4.2 2.6.0 > MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException > - > > Key: KAFKA-9950 > URL: https://issues.apache.org/jira/browse/KAFKA-9950 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > The > [MirrorConnectorConfig::CONNECTOR_CONFIG_DEF|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L397] > object is reused across multiple MirrorMaker2 classes, which is fine the > most part since it's a constant. However, the actual {{ConfigDef}} object > itself is mutable, and is mutated when the {{MirrorTaskConfig}} class > [statically constructs its own > ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java#L62]. > This has two unintended effects: > # Since the two {{ConfigDef}} objects for the {{MirrorConnectorConfig}} and > {{MirrorTaskConfig}} classes are actually the same object, the additional > properties that the {{MirrorTaskConfig}} class defines for its {{ConfigDef}} > are also added to the {{MirrorConnectorConfig}} class's {{ConfigDef}}. The > impact of this isn't huge since both additional properties have default > values, but this does cause those properties to appear in the > {{/connectors/\{name}/config/validate}} endpoint once the > {{MirrorTaskConfig}} class is loaded for the first time. > # It's possible that, if a config for a MirrorMaker2 connector is submitted > at approximately the same time that the {{MirrorTaskConfig}} class is loaded, > a {{ConcurrentModificationException}} will be thrown by the > {{AbstractHerder}} class when it tries to [iterate over all of the keys of > the connector's > ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L357]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9950) MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-9950. --- Resolution: Fixed > MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException > - > > Key: KAFKA-9950 > URL: https://issues.apache.org/jira/browse/KAFKA-9950 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > The > [MirrorConnectorConfig::CONNECTOR_CONFIG_DEF|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L397] > object is reused across multiple MirrorMaker2 classes, which is fine the > most part since it's a constant. However, the actual {{ConfigDef}} object > itself is mutable, and is mutated when the {{MirrorTaskConfig}} class > [statically constructs its own > ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java#L62]. > This has two unintended effects: > # Since the two {{ConfigDef}} objects for the {{MirrorConnectorConfig}} and > {{MirrorTaskConfig}} classes are actually the same object, the additional > properties that the {{MirrorTaskConfig}} class defines for its {{ConfigDef}} > are also added to the {{MirrorConnectorConfig}} class's {{ConfigDef}}. The > impact of this isn't huge since both additional properties have default > values, but this does cause those properties to appear in the > {{/connectors/\{name}/config/validate}} endpoint once the > {{MirrorTaskConfig}} class is loaded for the first time. > # It's possible that, if a config for a MirrorMaker2 connector is submitted > at approximately the same time that the {{MirrorTaskConfig}} class is loaded, > a {{ConcurrentModificationException}} will be thrown by the > {{AbstractHerder}} class when it tries to [iterate over all of the keys of > the connector's > ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L357]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions
kkonstantine commented on pull request #8511: URL: https://github.com/apache/kafka/pull/8511#issuecomment-631869695 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions
kkonstantine commented on a change in pull request #8511: URL: https://github.com/apache/kafka/pull/8511#discussion_r428436342 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java ## @@ -86,7 +86,7 @@ public ConnectorHealth connectorHealth(String connName) { FutureCallback> connectorConfigCallback = new FutureCallback<>(); herder.connectorConfig(connName, connectorConfigCallback); try { -return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS); +return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);); Review comment: My bad. My suggestion inserted a typo. At least I saw it before I start the build. ```suggestion return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS)); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8444: KAFKA-8869: Remove task configs for deleted connectors from config snapshot
kkonstantine commented on pull request #8444: URL: https://github.com/apache/kafka/pull/8444#issuecomment-631868755 Merged to `trunk` and backported to `2.5`, `2.4` and `2.3` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #8608: KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties
kkonstantine merged pull request #8608: URL: https://github.com/apache/kafka/pull/8608 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8608: KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties
kkonstantine commented on pull request #8608: URL: https://github.com/apache/kafka/pull/8608#issuecomment-631867826 jdk8: single failure on known flaky test: `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` jdk11: single failure on known flaky test: `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]` jdk14: success This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8118: KAFKA-9472: Remove deleted tasks from status store
kkonstantine commented on pull request #8118: URL: https://github.com/apache/kafka/pull/8118#issuecomment-631867151 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8869) Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of task removals
[ https://issues.apache.org/jira/browse/KAFKA-8869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-8869: -- Fix Version/s: 2.5.1 2.4.2 2.3.2 > Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of > task removals > --- > > Key: KAFKA-8869 > URL: https://issues.apache.org/jira/browse/KAFKA-8869 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Konstantine Karantasis >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > Investigation of https://issues.apache.org/jira/browse/KAFKA-8676 revealed > another issue: > a map in {{KafkaConfigBackingStore}} keeps growing despite of connectors and > tasks getting removed eventually. > This bug does not affect directly rebalancing protocols but it'd good to > resolve and use in a way similar to how {{connectorConfigs}} is used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores
ableegoldman commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r428433779 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -109,7 +94,7 @@ public void logChange(final String storeName, final long timestamp) { throwUnsupportedOperationExceptionIfStandby("logChange"); // Sending null headers to changelog topics (KIP-244) -collector.send( +streamTask.recordCollector().send( Review comment: I tried to reduce the number of unnecessary local variables that could potentially get out of sync This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8869) Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of task removals
[ https://issues.apache.org/jira/browse/KAFKA-8869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-8869. --- Resolution: Fixed > Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of > task removals > --- > > Key: KAFKA-8869 > URL: https://issues.apache.org/jira/browse/KAFKA-8869 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Konstantine Karantasis >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > Investigation of https://issues.apache.org/jira/browse/KAFKA-8676 revealed > another issue: > a map in {{KafkaConfigBackingStore}} keeps growing despite of connectors and > tasks getting removed eventually. > This bug does not affect directly rebalancing protocols but it'd good to > resolve and use in a way similar to how {{connectorConfigs}} is used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9472) Reducing number of tasks for connector causes deleted tasks to show as UNASSIGNED
[ https://issues.apache.org/jira/browse/KAFKA-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9472: -- Fix Version/s: 2.5.1 2.4.2 2.6.0 2.3.2 > Reducing number of tasks for connector causes deleted tasks to show as > UNASSIGNED > - > > Key: KAFKA-9472 > URL: https://issues.apache.org/jira/browse/KAFKA-9472 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, > 2.4.0, 2.3.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > If a connector is successfully created with {{t1}} running tasks and then > reconfigured to use {{t1 - n}} tasks (where {{t1}} and {{n}} are both whole > numbers and {{n}} is strictly less than {{t1}}), the connector should then > list {{t1 - n}} total tasks in its status (which can be queried via the > {{/connectors/:name:/status}} endpoint or the {{/connectors}} endpoint with > the {{expand}} URL query parameter set to {{status}}). > However, the connector will instead continue to list {{t1}} total tasks in > its status, with {{n}} of them being listed as {{UNASSIGNED}} and the > remaining {{t1 - n}} of them being listed as {{STARTED}}. > This is because the only time a task status is removed from the status > backing store (as opposed to simply being updated to {{UNASSIGNED}}) is when > its connector is deleted. See relevant code snippets from the > [AbstractHerder|https://github.com/apache/kafka/blob/df13fc93d0aebfe0ecc40dd4af3c5fb19b35f710/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L187-L192] > and > [DistributedHerder|https://github.com/apache/kafka/blob/df13fc93d0aebfe0ecc40dd4af3c5fb19b35f710/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1511-L1520] > classes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] radai-rosenblatt commented on pull request #8472: KAFKA-9855 - return cached Structs for Schemas with no fields
radai-rosenblatt commented on pull request #8472: URL: https://github.com/apache/kafka/pull/8472#issuecomment-631866599 @ijuma - any more modifications required? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] radai-rosenblatt commented on pull request #8193: be helpful when throwing ConcurrentModificationException out of consumers
radai-rosenblatt commented on pull request #8193: URL: https://github.com/apache/kafka/pull/8193#issuecomment-631866532 @ijuma - any further modifications required? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #8444: KAFKA-8869: Remove task configs for deleted connectors from config snapshot
kkonstantine merged pull request #8444: URL: https://github.com/apache/kafka/pull/8444 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pgwhalen commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
pgwhalen commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r428422376 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -953,7 +977,7 @@ void to(final TopicNameExtractor topicExtractor, * flatTransform()}. * * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer} - * @param stateStoreNames the names of the state stores used by the processor + * @param stateStoreNames the names of the state stores used by the transformer, passed only if {@link ConnectedStoreProvider#stores()} is null Review comment: Good catch, I will change this with the revised `KStream` javadocs coming soon. It was a relic of my original proposal; the code was written before we decided to change it and I missed removing this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner edited a comment on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner edited a comment on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-631853083 JDK 14 build has an unrelated test failure: org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] It looks like tests passed on other builds, but it failed to record the results. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner commented on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-631853083 JDK build has an unrelated test failure: org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] It looks like tests passed on other builds, but it failed to record the results. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8444: KAFKA-8869: Remove task configs for deleted connectors from config snapshot
kkonstantine commented on pull request #8444: URL: https://github.com/apache/kafka/pull/8444#issuecomment-631852881 jdk8: success jdk11: success jdk14: single failure on known flaky test: `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9409) Increase Immutability of ClusterConfigState
[ https://issues.apache.org/jira/browse/KAFKA-9409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-9409. --- Resolution: Fixed > Increase Immutability of ClusterConfigState > --- > > Key: KAFKA-9409 > URL: https://issues.apache.org/jira/browse/KAFKA-9409 > Project: Kafka > Issue Type: Improvement >Reporter: David Mollitor >Priority: Minor > Fix For: 2.6.0 > > > The class claims that it is immutable, but there are some mutable features of > this class. > > Increase the immutability of it and add a little cleanup: > * Pre-initialize size of ArrayList > * Remove superfluous syntax > * Use ArrayList instead of LinkedList since the list is created once -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9409) Increase Immutability of ClusterConfigState
[ https://issues.apache.org/jira/browse/KAFKA-9409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9409: -- Fix Version/s: 2.6.0 > Increase Immutability of ClusterConfigState > --- > > Key: KAFKA-9409 > URL: https://issues.apache.org/jira/browse/KAFKA-9409 > Project: Kafka > Issue Type: Improvement >Reporter: David Mollitor >Priority: Minor > Fix For: 2.6.0 > > > The class claims that it is immutable, but there are some mutable features of > this class. > > Increase the immutability of it and add a little cleanup: > * Pre-initialize size of ArrayList > * Remove superfluous syntax > * Use ArrayList instead of LinkedList since the list is created once -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine merged pull request #7942: KAFKA-9409: Supplement immutability of ClusterConfigState class in Connect
kkonstantine merged pull request #7942: URL: https://github.com/apache/kafka/pull/7942 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on pull request #8702: URL: https://github.com/apache/kafka/pull/8702#issuecomment-631849729 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #7942: KAFKA-9409: Increase Immutability of ClusterConfigState
kkonstantine commented on pull request #7942: URL: https://github.com/apache/kafka/pull/7942#issuecomment-631849532 jdk8: 1 flaky failure: `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` jdk11: didn't start jdk14: success This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112732#comment-17112732 ] victor commented on KAFKA-7500: --- [~ryannedolan] I have two distinct kafka clusters located in different data centers - DC1 and DC2. How to organize kafka producer failover between two DCs? If primary kafka cluster (DC1) becomes unavailable, I want producer to switch to failover kafka cluster (DC2) and continue publishing to it? Producer also should be able to switch back to primary cluster, once it is available. Any good patterns, existing libs, approaches, code examples? > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id
guozhangwang commented on pull request #8698: URL: https://github.com/apache/kafka/pull/8698#issuecomment-631837772 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id
guozhangwang commented on pull request #8698: URL: https://github.com/apache/kafka/pull/8698#issuecomment-631837548 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g
omkreddy merged pull request #8700: URL: https://github.com/apache/kafka/pull/8700 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g
omkreddy commented on pull request #8700: URL: https://github.com/apache/kafka/pull/8700#issuecomment-631829621 Merging as test failures are not related, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r428389929 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -565,8 +566,8 @@ private void recordRebalanceFailure() { // Note that we override the request timeout using the rebalance timeout since that is the // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays. - -int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000); +int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(), Review comment: The previous max check was wrong, but an alternative here is to use rebalanceTimeout + 5s in all cases regardless of the request timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.
[ https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112690#comment-17112690 ] victor commented on KAFKA-9981: --- [~ChrisEgerton] mm2 has realized data backup. How does kafkaproduce realize automatic failover transparently? How can the same kafkaproduce object automatically switch between two clusters? > Running a dedicated mm2 cluster with more than one nodes,When the > configuration is updated the task is not aware and will lose the update > operation. > > > Key: KAFKA-9981 > URL: https://issues.apache.org/jira/browse/KAFKA-9981 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: victor >Priority: Major > > DistributedHerder.reconfigureConnector induction config update as follows: > {code:java} > if (changed) { > List> rawTaskProps = reverseTransform(connName, > configState, taskProps); > if (isLeader()) { > configBackingStore.putTaskConfigs(connName, rawTaskProps); > cb.onCompletion(null, null); > } else { > // We cannot forward the request on the same thread because this > reconfiguration can happen as a result of connector > // addition or removal. If we blocked waiting for the response from > leader, we may be kicked out of the worker group. > forwardRequestExecutor.submit(new Runnable() { > @Override > public void run() { > try { > String leaderUrl = leaderUrl(); > if (leaderUrl == null || leaderUrl.trim().isEmpty()) { > cb.onCompletion(new ConnectException("Request to > leader to " + > "reconfigure connector tasks failed " + > "because the URL of the leader's REST > interface is empty!"), null); > return; > } > String reconfigUrl = RestServer.urlJoin(leaderUrl, > "/connectors/" + connName + "/tasks"); > log.trace("Forwarding task configurations for connector > {} to leader", connName); > RestClient.httpRequest(reconfigUrl, "POST", null, > rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); > cb.onCompletion(null, null); > } catch (ConnectException e) { > log.error("Request to leader to reconfigure connector > tasks failed", e); > cb.onCompletion(e, null); > } > } > }); > } > } > {code} > KafkaConfigBackingStore task checks for configuration updates,such as topic > whitelist update.If KafkaConfigBackingStore task is not running on leader > node,an HTTP request will be send to notify the leader of the configuration > update.However,dedicated mm2 cluster does not have the HTTP server turned > on,so the request will fail to be sent,causing the update operation to be > lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji opened a new pull request #8702: URL: https://github.com/apache/kafka/pull/8702 If the request timeout is larger than the rebalance timeout, we should use the former as the JoinGroup request timeout. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id
guozhangwang commented on pull request #8698: URL: https://github.com/apache/kafka/pull/8698#issuecomment-631810146 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on pull request #8679: URL: https://github.com/apache/kafka/pull/8679#issuecomment-631809507 Added the test. Will merge after Jenkins passed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
mjsax commented on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-631804240 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
mjsax commented on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-631804131 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8637: KAFKA-9976: Reuse repartition node in all cases for KGroupedStream and KGroupedTable aggregates
mjsax commented on pull request #8637: URL: https://github.com/apache/kafka/pull/8637#issuecomment-631803677 Seems you agree to my last comment: https://github.com/apache/kafka/pull/8504#issuecomment-631757206 I think it's best to close this PR and also the ticker (either as "not a problem" or "won't fix")? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r428376734 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -77,6 +79,38 @@ public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST); } + +@Test +public void shouldReuseRepartitionTopicWithGeneratedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); +assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString()); +} + +@Test +public void shouldCreateRepartitionTopicsWithUserProvidedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); +final Topology topology = builder.build(props); +System.out.println(topology.describe().toString()); +assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString()); Review comment: I guess both are acceptable solutions (ie, creating two repartition topics or throwing an exception). Your proposal is more user friendly but results in a more expensive deployment. The question might be, what do we try to optimize for? \cc @vvcephei @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r424060697 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -77,6 +79,38 @@ public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST); } + +@Test +public void shouldReuseRepartitionTopicWithGeneratedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); +assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString()); +} + +@Test +public void shouldCreateRepartitionTopicsWithUserProvidedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); +final Topology topology = builder.build(props); +System.out.println(topology.describe().toString()); +assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString()); Review comment: Sorry for being undecided... Reading the code now, I am wondering if this behavior may become problematic with regard to topology upgrade. Assume, the first join is removed. Technically, the new topology is compatible, but we would now generate a new repartition topic name, and thus it's not compatible. This could be fixed by inserting a `repartition()` in the new code enforcing the old name -- however, this makes me wonder if we might want to throw a "naming conflict" (ie, cannot pick a name) exception based on the original topology for this case when both operators are named, and tell people to insert `repartition()` right away? For this case, if they later remove a join it's clear what is happening to them. Ie, we should still not create two repartition topics what would be "bad" (user could still enforce if by calling `repartition()` twice), but just throw with an informative error message? -- Curious what @vvcephei thinks? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r424060697 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -77,6 +79,38 @@ public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST); } + +@Test +public void shouldReuseRepartitionTopicWithGeneratedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); +assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString()); +} + +@Test +public void shouldCreateRepartitionTopicsWithUserProvidedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); +final Topology topology = builder.build(props); +System.out.println(topology.describe().toString()); +assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString()); Review comment: Sorry for being undecided... Reading the code now, I am wondering if this behavior may become problematic with regard to topology upgrade. Assume, the first join is removed. Technically, the new topology is compatible, but we would now generate a new repartition topic name, and thus it's not compatible. This could be fixed by inserting a `repartition()` in the new code enforcing the old name -- however, this make me wonder if we might want to throw a "naming conflict" (ie, cannot pick a name) exception based on the original topology for this case when both operators are named, and tell people to insert `repartition()` right away? For this case, if they later remove a join it's clear what is happening to them. Ie, we should still not create two repartition topics what would be "bad" (user could still enforce if by calling `repartition()` twice), but just throw with an informative error message? -- Curious what @vvcephei thinks? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-1056) Evenly Distribute Intervals in OffsetIndex
[ https://issues.apache.org/jira/browse/KAFKA-1056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-1056. -- Resolution: Fixed > Evenly Distribute Intervals in OffsetIndex > -- > > Key: KAFKA-1056 > URL: https://issues.apache.org/jira/browse/KAFKA-1056 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > Today a new entry will be created in OffsetIndex for each produce request > regardless of the number of messages it contains. It is better to evenly > distribute the intervals between index entries for index search efficiency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112676#comment-17112676 ] Matthias J. Sax commented on KAFKA-6520: The consumer only stores/caches the last known coordinator metadata. There is not coordinator liveness check. The consumer only talks to the coordinator during a rebalance. However, if a network issue occurs during regular processing and the consumer cannot fetch data from the brokers, the cached coordinator metadata stays the same (ie, the coordinator is still known). Does this make sense? In the end, I believe without a consumer change we cannot really resolve the issue though. Maybe we could introduce a new "disconnected timeout" to the consumer: internally, the consumer sends fetch requests on a regular basis. Those fetch request would timeout if there is a network issue. Currently, the consumer "swallows" those timeout exceptions and just keeps retrying. `poll()` would just return zero records, but never rethrow a TimeoutException. The new "disconnected timeout" could be used to set a limit how long fetch request should be retried: if all fetch requests timeout for a period longer than "disconnected timeout", poll() could throw a "DisconnectedException" in the next `poll()` call (to reuse TimeoutException could be miss leading?). By default, the new "disconnected timeout" would be set to MAX_VALUE, and thus the default behavior would not change. Within KafkaStreams, we can set this new timeout to a smaller value and catch the exception (and change the Kafka Streams state to DISCONNECT). On a consecutive `poll()` that does not throw, we set the state back to RUNNING. Thoughts? > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8701: MINOR: Add reason to log message when incrementing the log start offset
hachikuji commented on pull request #8701: URL: https://github.com/apache/kafka/pull/8701#issuecomment-631789824 Need to fix some of the uses in test cases. Debating whether I should create an enum for the reason and get rid of the overload... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g
omkreddy commented on pull request #8700: URL: https://github.com/apache/kafka/pull/8700#issuecomment-631785818 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #8701: MINOR: Add reason to log message when incrementing the log start offset
hachikuji opened a new pull request #8701: URL: https://github.com/apache/kafka/pull/8701 Sometimes logging leaves us guessing at the cause of an increment to the log start offset. Since this results in deletion of user data, I think the logging should be clear about the reason. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xvrl commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428352951 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -270,9 +272,15 @@ Duration adminTimeout() { List metricsReporters() { List reporters = getConfiguredInstances( CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); -JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror"); +JmxReporter jmxReporter = new JmxReporter(); jmxReporter.configure(this.originals()); reporters.add(jmxReporter); +MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror"); Review comment: the KIP mentions that we are deprecating the jmx prefix directly on the JmxReporter, and instead are passing it via the metrics context as the `_namespace` parameter. This doesn't change the prefix or how they are exposed in jmx. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xvrl commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428351889 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -270,9 +272,15 @@ Duration adminTimeout() { List metricsReporters() { List reporters = getConfiguredInstances( CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); -JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror"); +JmxReporter jmxReporter = new JmxReporter(); jmxReporter.configure(this.originals()); reporters.add(jmxReporter); +MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror"); Review comment: @rhauch we mentioned connect more generally in the KIP, but I can clarify to make it explicit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs
C0urante commented on a change in pull request #8699: URL: https://github.com/apache/kafka/pull/8699#discussion_r428333743 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.predicates; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; + +/** + * A predicate which is true for records which are tombstones (i.e. have null key). + * @param The type of connect record. + */ +public class RecordIsTombstone> implements Predicate { +@Override +public ConfigDef config() { +return new ConfigDef(); Review comment: Probably won't impact performance too much but we could technically use a single `ConfigDef` instance for the entire class instead of creating a new one every time this method is called. ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.predicates; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; + +/** + * A predicate which is true for records which are tombstones (i.e. have null key). + * @param The type of connect record. + */ +public class RecordIsTombstone> implements Predicate { +@Override +public ConfigDef config() { +return new ConfigDef(); +} + +@Override +public boolean test(R record) { +return record.key() == null; Review comment: I think we want to check the value instead of the key here? ```suggestion return record.value() == null; ``` ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.predicates; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; + +/** + * A predicate which is true for records which are tombstones (i.e. have null key). Review comment: I think a tombstone is defined as a record with a null value, not a null key: ```suggestion * A predicate which is true for records which are tombstones (i.e. have null values). ``` ## File path: connect/runtime/src/main/java/org/apa
[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on pull request #8654: URL: https://github.com/apache/kafka/pull/8654#issuecomment-631762530 Thanks for the review, @kkonstantine. I think I've incorporated all of your suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on pull request #8654: URL: https://github.com/apache/kafka/pull/8654#issuecomment-631762616 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
bbejeck commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r428343055 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -77,6 +79,38 @@ public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST); } + +@Test +public void shouldReuseRepartitionTopicWithGeneratedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); +assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString()); +} + +@Test +public void shouldCreateRepartitionTopicsWithUserProvidedName() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); +final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); +final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); +final KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); +final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); +newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); +newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); +final Topology topology = builder.build(props); +System.out.println(topology.describe().toString()); +assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString()); Review comment: >This could be fixed by inserting a repartition() i the new code enforcing the old name -- however, this make me wonder if we might want to throw a "naming conflict" (ie, cannot pick a name) exception based on the original topology for this case when both operators are named, and tell people to insert repartition() right away? For this case, if they later remove a join it's clear what is happening to them. I see your point, but I think that is a bad user experience and IMHO leaks too much detail about an operation we want to handle automatically. I'm leaning towards the simpler case of what we had before. With generated names re-use the reputation node, but if the user creates a new join with explicit names, just go ahead and create two repartition topics. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error in joins
bbejeck commented on pull request #8504: URL: https://github.com/apache/kafka/pull/8504#issuecomment-631757206 > @bbejeck Are the any backward compatibility concerns for KAFKA-9976 ? Good point, maybe we should just leave this one alone for now. If you agree I'll close that PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id
guozhangwang commented on pull request #8698: URL: https://github.com/apache/kafka/pull/8698#issuecomment-631755813 There are known flaky tests that may fail, I'm validating that there's no consistent test failures. @xinzhuxiansheng This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
guozhangwang merged pull request #8661: URL: https://github.com/apache/kafka/pull/8661 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9878) Block AddPartitionsToTxn call until the txn markers are committed
[ https://issues.apache.org/jira/browse/KAFKA-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9878: --- Description: Currently the EndTxn call from Producer will immediately return as the control record is written to the txn coordinator log. The ongoing transaction will be going to a pending state to wait for all txn markers to be propagated. In the meantime, producer client will start another new transaction but being rejected constantly until the pending state gets resolved, which is unnecessary round trips and more burden to the broker to handle repetitive requests. To avoid this situation, we should make the Producer client wait for txn marker completion instead. This will incur better performance overall, as no more back-off shall be triggered for a subsequent transaction to begin. On the other hand, we could also batch complete the AddPartitionsToTxn results if we buffered more than one request in the queue. The third change is on the client side, which is to maintain the futures of the AddPartitionsToTxn calls to make more inflight changes as necessary. was: Currently the EndTxn call from Producer will immediately return as the control record is written to the txn coordinator log. The ongoing transaction will be going to a pending state to wait for all txn markers to be propagated. In the meantime, producer client will start another new transaction but being rejected constantly until the pending state gets resolved, which is unnecessary round trips and more burden to the broker to handle repetitive requests. To avoid this situation, we should make the Producer client wait for txn marker completion instead. This will incur better performance overall, as no more back-off shall be triggered for a subsequent transaction to begin. > Block AddPartitionsToTxn call until the txn markers are committed > - > > Key: KAFKA-9878 > URL: https://issues.apache.org/jira/browse/KAFKA-9878 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > Currently the EndTxn call from Producer will immediately return as the > control record is written to the txn coordinator log. The ongoing transaction > will be going to a pending state to wait for all txn markers to be > propagated. In the meantime, producer client will start another new > transaction but being rejected constantly until the pending state gets > resolved, which is unnecessary round trips and more burden to the broker to > handle repetitive requests. > To avoid this situation, we should make the Producer client wait for txn > marker completion instead. This will incur better performance overall, as no > more back-off shall be triggered for a subsequent transaction to begin. > On the other hand, we could also batch complete the AddPartitionsToTxn > results if we buffered more than one request in the queue. > The third change is on the client side, which is to maintain the futures of > the AddPartitionsToTxn calls to make more inflight changes as necessary. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on a change in pull request #8654: URL: https://github.com/apache/kafka/pull/8654#discussion_r428333268 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the creation of internal topics. + */ +@Category(IntegrationTest.class) +public class InternalTopicsIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); + +private EmbeddedConnectCluster.Builder connectBuilder; +private EmbeddedConnectCluster connect; +Map workerProps = new HashMap<>(); +Properties brokerProps = new Properties(); + +@Before +public void setup() { +// setup Kafka broker properties +brokerProps.put("auto.create.topics.enable", String.valueOf(false)); + +// build a Connect cluster backed by Kafka and Zk +connectBuilder = new EmbeddedConnectCluster.Builder() +.name("connect-cluster") +.numWorkers(1) +.numBrokers(1) +.brokerProps(brokerProps); +} + +@After +public void close() { +// stop all Connect, Kafka and Zk threads. +connect.stop(); +} + +@Test +public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedException { +int numWorkers = 1; +int numBrokers = 3; +connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") Review comment: I was running into other issues, and I never changed it. The names are not important, so I'll change them back to the same cluster name. I actually do prefer how each test fully sets up the cluster builder, in part because some of the information is changed and some isn't. I found it very error prone while I was working on this to reuse a builder and have each test only change *some* of the attributes, or to remember to set the worker props after changing the `workerProps` map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on a change in pull request #8654: URL: https://github.com/apache/kafka/pull/8654#discussion_r428334196 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the creation of internal topics. + */ +@Category(IntegrationTest.class) +public class InternalTopicsIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); + +private EmbeddedConnectCluster.Builder connectBuilder; +private EmbeddedConnectCluster connect; +Map workerProps = new HashMap<>(); +Properties brokerProps = new Properties(); + +@Before +public void setup() { +// setup Kafka broker properties +brokerProps.put("auto.create.topics.enable", String.valueOf(false)); + +// build a Connect cluster backed by Kafka and Zk +connectBuilder = new EmbeddedConnectCluster.Builder() +.name("connect-cluster") +.numWorkers(1) +.numBrokers(1) +.brokerProps(brokerProps); +} + +@After +public void close() { +// stop all Connect, Kafka and Zk threads. +connect.stop(); +} + +@Test +public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedException { +int numWorkers = 1; +int numBrokers = 3; +connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") + .workerProps(workerProps) + .numWorkers(numWorkers) + .numBrokers(numBrokers) + .brokerProps(brokerProps) + .build(); + +// Start the Connect cluster +connect.start(); +connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time."); +connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time."); +log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers); + +// Check the topics +log.info("Verifying the internal topics for Connect"); +connect.assertions().assertTopicsExist(configTopic(), offsetTopic(), statusTopic()); +assertInternalTopicSettings(); + +// Remove the Connect worker +log.info("Stopping the Connect worker"); +connect.removeWorker(); + +// Sleep for a bit +Thread.sleep(3000); Review comment: Good catch. I had put this in while debugging an issue, but it is not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on a change in pull request #8654: URL: https://github.com/apache/kafka/pull/8654#discussion_r428333268 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the creation of internal topics. + */ +@Category(IntegrationTest.class) +public class InternalTopicsIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); + +private EmbeddedConnectCluster.Builder connectBuilder; +private EmbeddedConnectCluster connect; +Map workerProps = new HashMap<>(); +Properties brokerProps = new Properties(); + +@Before +public void setup() { +// setup Kafka broker properties +brokerProps.put("auto.create.topics.enable", String.valueOf(false)); + +// build a Connect cluster backed by Kafka and Zk +connectBuilder = new EmbeddedConnectCluster.Builder() +.name("connect-cluster") +.numWorkers(1) +.numBrokers(1) +.brokerProps(brokerProps); +} + +@After +public void close() { +// stop all Connect, Kafka and Zk threads. +connect.stop(); +} + +@Test +public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedException { +int numWorkers = 1; +int numBrokers = 3; +connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") Review comment: I was running into other issues, and I never changed it. I actually prefer how each test fully sets up the cluster builder, but the names should not be important. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8270: KAFKA-9216: Enforce connect internal topic configuration at startup
rhauch commented on pull request #8270: URL: https://github.com/apache/kafka/pull/8270#issuecomment-631747698 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id
guozhangwang commented on pull request #8698: URL: https://github.com/apache/kafka/pull/8698#issuecomment-631738575 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
guozhangwang commented on pull request #8661: URL: https://github.com/apache/kafka/pull/8661#issuecomment-631737855 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
guozhangwang commented on pull request #8661: URL: https://github.com/apache/kafka/pull/8661#issuecomment-631738264 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
guozhangwang commented on pull request #8661: URL: https://github.com/apache/kafka/pull/8661#issuecomment-631737464 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner commented on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-631702750 One of the build failures failed due to one of the unit tests added in this PR. It was bug in the test, that was waiting on the wrong future, which had a name similar to another one. I fixed both waiting on the right future and changed variable name so that it is easier to spot. @mjsax Could you please rerun the tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs
tombentley commented on pull request #8699: URL: https://github.com/apache/kafka/pull/8699#issuecomment-631683003 @C0urante you might also want to take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8357: KAFKA-9767: Add logging to basic auth rest extension
C0urante commented on pull request #8357: URL: https://github.com/apache/kafka/pull/8357#issuecomment-631682508 Thanks @rhauch; I've added the requested unit test. This is ready for another round when you have time This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8357: KAFKA-9767: Add logging to basic auth rest extension
C0urante commented on a change in pull request #8357: URL: https://github.com/apache/kafka/pull/8357#discussion_r428260801 ## File path: connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java ## @@ -67,36 +87,60 @@ public void filter(ContainerRequestContext requestContext) throws IOException { private String password; public BasicAuthCallBackHandler(String credentials) { -if (credentials != null) { -int space = credentials.indexOf(SPACE); -if (space > 0) { -String method = credentials.substring(0, space); -if (BASIC.equalsIgnoreCase(method)) { -credentials = credentials.substring(space + 1); -credentials = new String(Base64.getDecoder().decode(credentials), - StandardCharsets.UTF_8); -int i = credentials.indexOf(COLON); -if (i > 0) { -username = credentials.substring(0, i); -password = credentials.substring(i + 1); -} -} -} +if (credentials == null) { +log.trace("No credentials were provided with the request"); +return; } + +int space = credentials.indexOf(SPACE); +if (space <= 0) { +log.trace("Request credentials were malformed; no space present in value for authorization header"); +return; +} + +String method = credentials.substring(0, space); +if (!BASIC.equalsIgnoreCase(method)) { +log.trace("Request credentials used {} authentication, but only {} supported; ignoring", method, BASIC); +return; +} + +credentials = credentials.substring(space + 1); +credentials = new String(Base64.getDecoder().decode(credentials), + StandardCharsets.UTF_8); +int i = credentials.indexOf(COLON); +if (i <= 0) { +log.trace("Request credentials were malformed; no colon present between username and password"); +return; +} + +username = credentials.substring(0, i); +password = credentials.substring(i + 1); } @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { +List unsupportedCallbacks = new ArrayList<>(); for (Callback callback : callbacks) { if (callback instanceof NameCallback) { ((NameCallback) callback).setName(username); } else if (callback instanceof PasswordCallback) { -((PasswordCallback) callback).setPassword(password.toCharArray()); +((PasswordCallback) callback).setPassword(password != null +? password.toCharArray() +: null +); } else { -throw new UnsupportedCallbackException(callback, "Supports only NameCallback " - + "and PasswordCallback"); +unsupportedCallbacks.add(callback); } } +if (!unsupportedCallbacks.isEmpty()) +throw new ConnectException(String.format( +"Unsupported callbacks %s; request authentication will fail. " ++ "This indicates the Connect worker was configured with a JAAS " ++ "LoginModule that is incompatible with the %s, and will need to be " ++ "corrected and restarted.", +unsupportedCallbacks, +BasicAuthSecurityRestExtension.class.getSimpleName() +)); Review comment: Ack, added a new test for an unsupported callback. We have tests already for the green path, so it didn't seem necessary to add more than that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8357: KAFKA-9767: Add logging to basic auth rest extension
C0urante commented on a change in pull request #8357: URL: https://github.com/apache/kafka/pull/8357#discussion_r428260448 ## File path: connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java ## @@ -67,36 +87,60 @@ public void filter(ContainerRequestContext requestContext) throws IOException { private String password; public BasicAuthCallBackHandler(String credentials) { -if (credentials != null) { -int space = credentials.indexOf(SPACE); -if (space > 0) { -String method = credentials.substring(0, space); -if (BASIC.equalsIgnoreCase(method)) { -credentials = credentials.substring(space + 1); -credentials = new String(Base64.getDecoder().decode(credentials), - StandardCharsets.UTF_8); -int i = credentials.indexOf(COLON); -if (i > 0) { -username = credentials.substring(0, i); -password = credentials.substring(i + 1); -} -} -} +if (credentials == null) { +log.trace("No credentials were provided with the request"); +return; } + +int space = credentials.indexOf(SPACE); +if (space <= 0) { +log.trace("Request credentials were malformed; no space present in value for authorization header"); +return; +} + +String method = credentials.substring(0, space); +if (!BASIC.equalsIgnoreCase(method)) { +log.trace("Request credentials used {} authentication, but only {} supported; ignoring", method, BASIC); +return; +} + +credentials = credentials.substring(space + 1); +credentials = new String(Base64.getDecoder().decode(credentials), + StandardCharsets.UTF_8); +int i = credentials.indexOf(COLON); +if (i <= 0) { +log.trace("Request credentials were malformed; no colon present between username and password"); +return; +} + +username = credentials.substring(0, i); +password = credentials.substring(i + 1); } @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { +List unsupportedCallbacks = new ArrayList<>(); Review comment: 👍 happy to optimize if you'd like, but it sounds like we can leave it as-is for now at least. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g
omkreddy commented on pull request #8700: URL: https://github.com/apache/kafka/pull/8700#issuecomment-631677805 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428255598 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java ## @@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(distributedConfig); Review comment: Get kafkaClusterId from ConnectUtils.lookupKafkaClusterId in configure(...) of KafkaOffsetBackingStore This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112535#comment-17112535 ] Sophie Blee-Goldman commented on KAFKA-9987: [~hai_lin] I put this on pause to try and get another KIP into 2.6, but the non-testing code is all ready. I just haven't called for review on the PR yet since I'm still finishing up the unit tests/benchmarks. So, yeah, the patch is ready if you want to do some local testing. I can't promise that there are no bugs, but it's a pretty straightforward algorithm and I've run some initial benchmarks with it working. > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs
tombentley commented on pull request #8699: URL: https://github.com/apache/kafka/pull/8699#issuecomment-631651186 Still need to review test coverage, but @kkonstantine, @mimaison, @bbejeck you might want to give it an initial pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley opened a new pull request #8699: KAFKA-9673: Filter and Conditional SMTs
tombentley opened a new pull request #8699: URL: https://github.com/apache/kafka/pull/8699 * Add Predicate interface * Add Filter SMT * Add the predicate implementations defined in the KIP. * Create abstraction in ConnectorConfig for configuring Transformations and Connectors with the "alias prefix" mechanism * Add tests and fix existing tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy opened a new pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g
omkreddy opened a new pull request #8700: URL: https://github.com/apache/kafka/pull/8700 We have seen out of memory error in builds. This PR is to increase the gradle heap memory to 2g. ``` [Error] : Error while emitting kafka/log/LogTest [2020-05-20T11:33:15.133Z] GC overhead limit exceeded [2020-05-20T11:33:15.133Z] one error found ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering
[ https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112511#comment-17112511 ] Guozhang Wang commented on KAFKA-8159: -- Hey John, What I was asking is actually beyond signed numerical types -- for which I agree offering new serdes as opt-in should be sufficient -- but for any types like you described in the example above. Today our javadoc for a range query looks like this: {code} /** * Get an iterator over a given range of keys. This iterator must be closed after use. * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s * and must not return null values. No ordering guarantees are provided. * @param from The first key that could be in the range * @param to The last key that could be in the range * @return The iterator for this range. * @throws NullPointerException If null is used for from or to. * @throws InvalidStateStoreException if the store is not initialized */ KeyValueIterator range(K from, K to); {code} For most users the {{from}} < {{to}} relationship is implicitly define as `from.compareTo(to) <= 0`, however what they mostly also assume, but actually not guaranteed is that `serialize(from).compareTo(serialize(to)) <= 0`. And we should make it clear in the javadoc that the "first / last" key in the range is actually defined based on their serialized bytes, not by their objects, and it is user's responsibility to either make sure the serializers can correctly transfer the object ordering to bytes ordering, or have parameters passed in to {{from / to}} to obey the bytes ordering already. > Built-in serdes for signed numbers do not obey lexicographical ordering > --- > > Key: KAFKA-8159 > URL: https://issues.apache.org/jira/browse/KAFKA-8159 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > Currently we assume consistent ordering between serialized and deserialized > keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes > will also obey bytesA < bytesB < bytesC. This is not true in general of the > built-in serdes for signed numerical types (eg Integer, Long). Specifically, > it is broken by the negative number representations which are > lexicographically greater than (all) positive number representations. > > One consequence of this is that an interactive query of a key range with a > negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) > will result in "unexpected behavior" depending on the specific store type. > > For RocksDB stores with caching disabled, an empty iterator will be returned > regardless of whether any records do exist in that range. > For in-memory stores and ANY store with caching enabled, Streams will throw > an unchecked exception and crash. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112508#comment-17112508 ] Hai Lin edited comment on KAFKA-4084 at 5/20/20, 6:18 PM: -- Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a bit more about why KIP-491 is not in consideration based on your comment above. I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it. I feel it's a very useful feature for a lot of operation cases: 1. For high replica rate when broker boot up: To me uneven size of partition on production is very command, with throttle some big partitions will take much longer to get fully replicated. Sometimes we just want a fully replica broker(like in 10 minutes without replica rather than hours). A long time with under replica broker in the system add more complicity for operation. For example, we need to be careful there is no other broker is offline during the replicating process. 2 Other situation like outlier broker This happen pretty often if the cluster is big, most of the time it's not easy(at least time consuming) to replace broker even with EBS. We would like to disable a broker as leader but not take it offline. So the on-call have time to investigate the problem without terminate it right away. With KIP-491 we can add a lot of automation to the system that handle some network partition for a single broker without actually replace it. 3 Potential If we can manipulate the view of leader in a cluster, we can do a bit more like introduce different leader for producer and consumer(consumer now can consumer from replica but I think there is still way we can control it). Then we can add priority to the client level and isolate client to talk only some of the brokers. This is more for KIP-491, we can surely move it back to the original ticket if we feel there is more discussion for this. was (Author: hai_lin): Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a bit more about why KIP-491 is not in consideration based on your comment above. {*quote*}I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it. {*quote*} I feel it's a very useful feature for a lot of operation cases: 1. For high replica rate when broker boot up: To me uneven size of partition on production is very command, with throttle some big partitions will take much longer to get fully replicated. Sometimes we just want a fully replica broker(like in 10 minutes without replica rather than hours). A long time with under replica broker in the system add more complicity for operation. For example, we need to be careful there is no other broker is offline during the replicating process. 2 Other situation like outlier broker This happen pretty often if the cluster is big, most of the time it's not easy(at least time consuming) to replace broker even with EBS. We would like to disable a broker as leader but not take it offline. So the on-call have time to investigate the problem without terminate it right away. With KIP-491 we can add a lot of automation to the system that handle some network partition for a single broker without actually replace it. 3 Potential If we can manipulate the view of leader in a cluster, we can do a bit more like introduce different leader for producer and consumer(consumer now can consumer from replica but I think there is still way we can control it). Then we can add priority to the client level and isolate client to talk only some of the brokers. This is more for KIP-491, we can surely move it back to the original ticket if we feel there is more discussion for this. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions`
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112508#comment-17112508 ] Hai Lin edited comment on KAFKA-4084 at 5/20/20, 6:18 PM: -- Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a bit more about why KIP-491 is not in consideration based on your comment above. {*quote*}I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it. {*quote*} I feel it's a very useful feature for a lot of operation cases: 1. For high replica rate when broker boot up: To me uneven size of partition on production is very command, with throttle some big partitions will take much longer to get fully replicated. Sometimes we just want a fully replica broker(like in 10 minutes without replica rather than hours). A long time with under replica broker in the system add more complicity for operation. For example, we need to be careful there is no other broker is offline during the replicating process. 2 Other situation like outlier broker This happen pretty often if the cluster is big, most of the time it's not easy(at least time consuming) to replace broker even with EBS. We would like to disable a broker as leader but not take it offline. So the on-call have time to investigate the problem without terminate it right away. With KIP-491 we can add a lot of automation to the system that handle some network partition for a single broker without actually replace it. 3 Potential If we can manipulate the view of leader in a cluster, we can do a bit more like introduce different leader for producer and consumer(consumer now can consumer from replica but I think there is still way we can control it). Then we can add priority to the client level and isolate client to talk only some of the brokers. This is more for KIP-491, we can surely move it back to the original ticket if we feel there is more discussion for this. was (Author: hai_lin): Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a bit more about why KIP-491 is not in consideration based on your comment above. {*quote*} I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it. {*quote*} I feel it's a very useful feature for a lot of operation cases: 1. For high replica rate when broker boot up: To me uneven size of partition on production is very command, with throttle some big partitions will take much longer to get fully replicated. Sometimes we just want a fully replica broker(like in 10 minutes without replica rather than hours). A long time with under replica broker in the system add more complicity for operation. For example, we need to be careful there is no other broker is offline during the replicating process. 2 Other situation like outlier broker This happen pretty often if the cluster is big, most of the time it's not easy(at least time consuming) to replace broker even with EBS. We would like to disable a broker as leader but not take it offline. So the on-call have time to investigate the problem without terminate it right away. With KIP-491 we can add a lot of automation to the system that handle some network partition for a single broker without actually replace it. 3 Potential If we can manipulate the view of leader in a cluster, we can do a bit more like introduce different leader for producer and consumer(consumer now can consumer from replica but I think there is still way we can control it). Then we can add priority to the client level and isolate client to talk only some of the brokers. This is more for KIP-491, we can surely move it back to the original ticket if we feel there is more discussion for this. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `Unde
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112508#comment-17112508 ] Hai Lin commented on KAFKA-4084: Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a bit more about why KIP-491 is not in consideration based on your comment above. {*quote*} I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it. {*quote*} I feel it's a very useful feature for a lot of operation cases: 1. For high replica rate when broker boot up: To me uneven size of partition on production is very command, with throttle some big partitions will take much longer to get fully replicated. Sometimes we just want a fully replica broker(like in 10 minutes without replica rather than hours). A long time with under replica broker in the system add more complicity for operation. For example, we need to be careful there is no other broker is offline during the replicating process. 2 Other situation like outlier broker This happen pretty often if the cluster is big, most of the time it's not easy(at least time consuming) to replace broker even with EBS. We would like to disable a broker as leader but not take it offline. So the on-call have time to investigate the problem without terminate it right away. With KIP-491 we can add a lot of automation to the system that handle some network partition for a single broker without actually replace it. 3 Potential If we can manipulate the view of leader in a cluster, we can do a bit more like introduce different leader for producer and consumer(consumer now can consumer from replica but I think there is still way we can control it). Then we can add priority to the client level and isolate client to talk only some of the brokers. This is more for KIP-491, we can surely move it back to the original ticket if we feel there is more discussion for this. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions
C0urante commented on pull request #8511: URL: https://github.com/apache/kafka/pull/8511#issuecomment-631633951 Thanks @kkonstantine, I've addressed both of your comments. Ready for another round when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428205238 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -92,6 +93,7 @@ private final ExecutorService executor; private final Time time; private final String workerId; +private final String clusterId; Review comment: I'm still keeping clusterId as a class variable in Worker, since we are using clusterId in other method of this class. We are getting clusterId from ConnectUtils.lookupKafkaClusterId inside Worker constructor and keep it as a class variable value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428203784 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -92,6 +93,7 @@ private final ExecutorService executor; private final Time time; private final String workerId; +private final String clusterId; Review comment: Removed clusterId from Worker constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428203600 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java ## @@ -93,7 +93,7 @@ public static void main(String[] args) { config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), config, ConnectorClientConfigOverridePolicy.class); Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(), - connectorClientConfigOverridePolicy); + connectorClientConfigOverridePolicy, kafkaClusterId); Review comment: Removed clusterId from Worker constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428203369 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java ## @@ -101,24 +101,25 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), config, ConnectorClientConfigOverridePolicy.class); -Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy); +Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); -StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); +StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId); statusBackingStore.configure(config); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, config, -configTransformer); +configTransformer, +kafkaClusterId); Review comment: Removed clusterId from back store constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428203009 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java ## @@ -101,24 +101,25 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), config, ConnectorClientConfigOverridePolicy.class); -Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy); +Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); -StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); +StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId); Review comment: Removed clusterId from KafkaStatusBackingStore constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428202532 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java ## @@ -101,24 +101,25 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); Review comment: Removed clusterId from KafkaOffsetBackingStore constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428202742 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java ## @@ -101,24 +101,25 @@ public Connect startConnect(Map workerProps) { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), config, ConnectorClientConfigOverridePolicy.class); -Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy); +Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId); Review comment: Removed clusterId from Worker constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428202270 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java ## @@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(distributedConfig); -Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY); +Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); -StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); +StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId); statusBackingStore.configure(distributedConfig); Review comment: Removed clusterId from KafkaStatusBackingStore constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428202016 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java ## @@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(distributedConfig); -Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY); +Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); -StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); +StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId); statusBackingStore.configure(distributedConfig); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, distributedConfig, -configTransformer); +configTransformer, +kafkaClusterId); Review comment: Removed clusterId from KafkaConfigBackingStore constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r428201738 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java ## @@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); -KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); +KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId); offsetBackingStore.configure(distributedConfig); Review comment: Removed clusterId from KafkaOffsetBackingStore constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9859) kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation
[ https://issues.apache.org/jira/browse/KAFKA-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze resolved KAFKA-9859. -- Fix Version/s: 2.6.0 Resolution: Fixed fixed with PR [https://github.com/apache/kafka/pull/8671] > kafka-streams-application-reset tool doesn't take into account topics > generated by KTable foreign key join operation > > > Key: KAFKA-9859 > URL: https://issues.apache.org/jira/browse/KAFKA-9859 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > Labels: newbie, newbie++ > Fix For: 2.6.0 > > > Steps to reproduce: > * Create Kafka Streams application which uses foreign key join operation > (without a Named parameter overload) > * Stop Kafka streams application > * Perform `kafka-topics-list` and verify that foreign key operation internal > topics are generated > * Use `kafka-streams-application-reset` to perform the cleanup of your kafka > streams application: `kafka-streams-application-reset --application-id > --input-topics --bootstrap-servers > --to-datetime 2019-04-13T00:00:00.000` > * Perform `kafka-topics-list` again, you'll see that topics generated by the > foreign key operation are still there. > [kafka-streams-application-reset|#L679-L680]] uses > `-subscription-registration-topic` and `-subscription-response-topic` > suffixes to match topics generated by the foreign key operation. While in > reality, internal topics are generated in this format: > {code:java} > -KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION- number>-topic > -KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE- number>-topic{code} > Please note that this problem only happens when `Named` parameter is not > used. When named parameter is used, topics are generated with a same pattern > as specified in StreamsResetter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9899) LogCleaner Tries To Clean Single Partition Over 1000x/Minute
[ https://issues.apache.org/jira/browse/KAFKA-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112460#comment-17112460 ] Jeff Nadler commented on KAFKA-9899: upgraded to 2.5.0, still experiencing this issue > LogCleaner Tries To Clean Single Partition Over 1000x/Minute > > > Key: KAFKA-9899 > URL: https://issues.apache.org/jira/browse/KAFKA-9899 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.4.1 > Environment: ubuntu bionic, openjdk11.0.6, kafka 2.4.1 >Reporter: Jeff Nadler >Priority: Major > Attachments: CPU-Usage.png > > > I had previously believed this to be the same issue as KAFKA-8764, but I took > a closer look when it persisted after upgrading to 2.4.1 and now believe this > is a different bug. > For a topic that is a very low traffic, compact topic the log cleaner will > sometimes - for a period of usually 2 hours or longer - get stuck in a loop > where it tries to clean the same partition for the same offset range nonstop, > and the log cleaner thread consumes 100% of a single core during this time. > h4. 1396 attempts in a single minute: > > {{root@stage-obs-kafka01:/var/log/kafka# cat log-cleaner.log | grep 22:22: | > grep "offset range" | wc -l}} > {{1396}} > > h4. All 1396 of these are looking at the same partition and same offset range: > {{[2020-04-21 22:22:59,862] INFO Cleaner 0: Building offset map for log > elauneind-firebolt-messages-sfo-0 for 0 segments in offset range [22943108, > 22912825). (kafka.log.LogCleaner)}} > > These attempts are separated by on average only 30ms. This is a small 3 > node cluster, note that the CPU graph attached is very clearly bimodal for > each node: low when the log cleaner is not "stuck", and much higher when it > is. > Eventually the log cleaner appears to find a segment to clean (because enough > traffic has arrived?) and the loop is broken... for a time. Note that it > finds "1 segments" and then finally moves on to check other topic-partitions. > {{...tens of thousands of this first one then}} > {{[2020-04-21 20:06:02,531] INFO Cleaner 0: Building offset map for log > elauneind-firebolt-messages-sfo-0 for 0 segments in *offset range* [23591841, > 23575583). (kafka.log.LogCleaner)}}{{[2020-04-21 20:06:02,567] INFO Cleaner > 0: Building offset map for log elauneind-firebolt-messages-sfo-0 for 1 > segments in *offset range* [23591841, 23621641). > (kafka.log.LogCleaner)}}{{[2020-04-21 20:43:04,309] INFO Cleaner 0: Building > offset map for log elauneind-firebolt-messages-s2r1-0 for 1 segments in > *offset range* [2687968, 2732498). (kafka.log.LogCleaner)}} > > h4. The topic gets about 100 messsages/minute, and it's config is: > {{Topic: elauneind-firebolt-messages-sfo PartitionCount: 1 ReplicationFactor: > 3 Configs: > min.insync.replicas=1,cleanup.policy=compact,delete,segment.bytes=10240,retention.ms=90,message.format.version=2.3-IV1,min.compaction.lag.ms=30,min.cleanable.dirty.ratio=0.2,unclean.leader.election.enable=true,retention.bytes=1073741824}}{{ > Topic: elauneind-firebolt-messages-sfo Partition: 0 Leader: 0 Replicas: > 0,2,1 Isr: 0,1,2}} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nizhikov commented on pull request #8695: KAFKA-9320: KIP-573 - Enable TLSv1.3 by default
nizhikov commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-631614084 @ijuma I can't see downside in forcing usage of the latest TLS version. Added this change to PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #8695: KAFKA-9320: KIP-573 - Enable TLSv1.3 by default
nizhikov commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r428173640 ## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ## @@ -622,6 +622,34 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } +/** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ +@Test +public void testCiphersSuiteForTLSv1_2() throws Exception { +String node = "0"; +SSLContext context = SSLContext.getInstance(tlsProtocol); +context.init(null, null, null); + +//Note, that only some ciphers works out of the box. Others requires additional configuration. +String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + +sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); +sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(","))); Review comment: Tests added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org