[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
kkonstantine commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570747258 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaBasedLog.class) +@PowerMockIgnore("javax.management.*") +public class SharedTopicAdminTest { + +private static final Map CONFIG = Collections.emptyMap(); + +@Mock private TopicAdmin mockTopicAdmin; +private SharedTopicAdmin sharedAdmin; +private int created = 0; Review comment: I know. It's just that we already use a mocking framework and we could use something like: `EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();` if we also defined `factory` to be a mock as well. That could allow us to evaluate expectations on the mock more accurately (e.g. with a capture if we had to). But sure, if we need something quick and easy we can go with that. It's just that I noticed a mixed use of mocks with this variable that simulates what the mocking framework offers already. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
kkonstantine commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570728906 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers. + * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin} + * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}. + * + * The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)} + * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this + * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + * This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object, + * until this object is closed, at which point it cannot be used anymore + */ +public class SharedTopicAdmin implements AutoCloseable, Supplier { + +// Visible for testing +static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE); + +private final Map adminProps; +private final AtomicReference admin = new AtomicReference<>(); +private final AtomicBoolean closed = new AtomicBoolean(false); +private final Function, TopicAdmin> factory; + +public SharedTopicAdmin(Map adminProps) { +this(adminProps, TopicAdmin::new); +} + +// Visible for testing +SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) { +this.adminProps = Objects.requireNonNull(adminProps); +this.factory = Objects.requireNonNull(factory); +} + +/** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ +@Override +public TopicAdmin get() { +return topicAdmin(); +} + +/** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ +public TopicAdmin topicAdmin() { +return admin.updateAndGet(this::createAdmin); Review comment: I'm happy to leave it as an example of the pattern that demonstrates how to apply `updateAndGet`. I just didn't feel that the two or three levels of indirection were worth to write the singleton pattern differently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7
wcarlson5 commented on pull request #10060: URL: https://github.com/apache/kafka/pull/10060#issuecomment-773820050 +1 from me. Thanks for the pr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
hachikuji commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r570730096 ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport Review comment: It's a little odd to see this in the trait. Would it be reasonable to turn them into private defs in `KafkaApis`? I'm ok with it if you think it is better. We'll only ever have the two implementations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
kkonstantine commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570732565 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -185,16 +188,33 @@ private final DistributedConfig config; +/** + * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs) + * that have the same group ID. + * + * @param config the configuration for the worker; may not be null + * @param time the clock to use; may not be null + * @param worker the {@link Worker} instance to use; may not be null + * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null + * @param statusBackingStore the backing store for statuses; may not be null + * @param configBackingStore the backing store for connector configurations; may not be null + * @param restUrlthe URL of this herder's REST API; may not be null + * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden + *in connector configurations; may not be null + * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, + * after all services and resources owned by this herder are stopped + */ public DistributedHerder(DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { Review comment: We can always keep a constructor with the old signature along with the new if we wanted not to break classes that use `DistributedHerder`. I'm fine with the change here as a short term workaround. I guess it saves us one constructor but we can use it only once. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers. + * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin} + * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}. + * + * The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)} + * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this + * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + * This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object, + * until this object is closed, at which point it cannot be used anymore + */ +public class SharedTopicAdmin implements AutoCloseable, Supplier { + +// Visible for testing +static
[GitHub] [kafka] hachikuji commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
hachikuji commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r570727037 ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport + + /** + * Return this instance downcast for use with Raft + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with Raft + * @throws Exception if this instance is not for Raft + */ + def requireRaftOrThrow(createException: => Exception): RaftSupport + + /** + * Confirm that this instance is consistent with the given config + * + * @param config the config to check for consistency with this instance + * @throws IllegalStateException if there is an inconsistency (Raft for a ZooKeeper config or vice-versa) + */ + def confirmConsistentWith(config: KafkaConfig): Unit + + def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit, + responseCallback: Option[AbstractResponse] => Unit): Unit +} + +case class ZkSupport(adminManager: ZkAdminManager, + controller: KafkaController, + zkClient: KafkaZkClient, + forwardingManager: Option[ForwardingManager]) extends MetadataSupport { + val adminZkClient = new AdminZkClient(zkClient) + + override def requireZkOrThrow(createException: => Exception): ZkSupport = this + override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException + + override def confirmConsistentWith(config: KafkaConfig): Unit = { +if (!config.requiresZookeeper) { + throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper") +} + } + + override def maybeForward(request: RequestChannel.Request, +handler: RequestChannel.Request => Unit, +responseCallback: Option[AbstractResponse] => Unit): Unit = { +if (forwardingManager.isDefined && !request.isForwarded && !controller.isActive) { Review comment: nit: usually when you see `isDefined` followed by `get`, there is likely an opportunity for a `match` or `foreach`. ```scala forwardingManager match { case Some(mgr) if !request.isForwarded && !controllers.isActive => forwardingManager.get.forwardRequest(request, responseCallback) case _ => handler(request) } ``` ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific lang
[GitHub] [kafka] bob-barrett commented on pull request #10065: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett commented on pull request #10065: URL: https://github.com/apache/kafka/pull/10065#issuecomment-773800777 This is a backport of 131d475 to 2.4. As with #10061, the conflicts were due to the missing connection timeout config, which was added in 2.7, and didn't meaningfully change the behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on pull request #10067: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett commented on pull request #10067: URL: https://github.com/apache/kafka/pull/10067#issuecomment-773800834 This is a backport of 131d475 to 2.3. As with #10061, the conflicts were due to the missing connection timeout config, which was added in 2.7, and didn't meaningfully change the behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett opened a new pull request #10067: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett opened a new pull request #10067: URL: https://github.com/apache/kafka/pull/10067 This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. Reviewers: Mickael Maison , Satish Duggana , David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behavior by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: The main motivation to add `ProcessorContext#currentSystemTime()` was to be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if we return the cached time from `AbstractProcessorContext`, we will be able to return the mocked time, as we update the cached time based on the mocked time in `TopologyTestDriver`. - `InternalMockProcessorContext` is just for our own unit testing -- it's fine to add the new `Time` field, it's not a public facing change anyway - Originally we changed `ProcessorContextImpl` because we remove `AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we should keep the cached time in `AbstractProcessorContext()`, and thus we don't need `ProcessorContextImpl#currentSystemTime()` any longer. - `GlobalProcessorContextImpl` is a different code path, and thus the changes of this PR are fine > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Yes, we want to return this field. (We get this behaviro by adding back `AbstractProcessorContext#currentSystemTime()` (and the cached time in this class) and removing `ProcessorContextImpl#currentSystemTime()`. > Are earlier changes not valid to return time from StreamTask? Yes, I think we can revert all changes from `StreamTask`. Does this make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes
hachikuji opened a new pull request #10066: URL: https://github.com/apache/kafka/pull/10066 With KIP-500, we have more complex requirements on API accessibility. Previously all APIs were accessible on every listener exposed by the broker, but now that is no longer true. For example: - the controller exposes some APIs which are not accessible on the broker listener (e.g. quorum/registration/heartbeat APIs) - most of the client APIs are not exposed on the controller (e.g. consumer group apis) - there are some APIs which are not implemented by the KIP-500 broker (e.g. `LeaderAndIsr` and `UpdateMetadata`) - there are some APIs which are only implemented by the KIP-500 broker (e.g. `DecommissionBroker` and `DescribeQuorum`) All of this means that we need more sophistication in how we expose APIs and keep them consistent with the `ApiVersions` API. Up to now, we have been working around this using the `controllerOnly` flag inside `ApiKeys`, but this is not rich enough to support all of the cases listed above. In this patch, we address this by problem by introducing a new `scope` field to the request schema definitions. This field is an array of strings which indicate the scope in which the API should be exposed. We currently support the following scopes: - `zkBroker`: old broker - `broker`: kip-500 broker - `controller`: kip-500 controller - `raft`: raft test server For example, the `DecommissionBroker` API has the following scope tag: ```json "scope": ["broker", "controller"] ``` This indicates that the API is only on the KIP-500 broker and controller (both are needed because the request will be sent by clients and forwarded to the controller). The patch changes the generator so that the scope definitions are added to `ApiMessageType` and exposed through convenient helpers. At the same time, we have removed the `controllerOnly` flag from `ApiKeys` since now we can identify all controller APIs through the "controller" scope tag. The rest of the patch is dedicated to ensuring that the API scope is properly set. We have created a new `ApiVersionManager` which encapsulates the creation of the `ApiVersionsResponse` based on the scope. Additionally, `SocketServer` is modified to ensure the scope of received requests before forwarding them to the request handler. We have also fixed a bug in the handling of the `ApiVersionsResponse` prior to authentication. Previously a static response was sent, which means that changes to features would not get reflected. This also meant that the logic to ensure that only the intersection of version ranges supported by the controller would get exposed did not work. I think this is important because some clients rely on the initial pre-authenticated `ApiVersions` response rather than doing a second round after authentication as the Java client does. One final cleanup note: I have removed the expectation that envelope requests are only allowed on "privileged" listeners. This made sense initially because we expected to use forwarding before the KIP-500 controller was available. That is not the case anymore and we expect the `Envelope` API to only be exposed on the controller listener. I have nevertheless preserved the existing workarounds to allow this API to verify forwarding behavior in integration testing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10048: MINOR: add docs for KIP-680
mjsax merged pull request #10048: URL: https://github.com/apache/kafka/pull/10048 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax commented on pull request #1: URL: https://github.com/apache/kafka/pull/1#issuecomment-773774753 Updated this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10048: MINOR: add docs for KIP-680
mjsax commented on a change in pull request #10048: URL: https://github.com/apache/kafka/pull/10048#discussion_r570698424 ## File path: docs/streams/upgrade-guide.html ## @@ -121,6 +121,12 @@ Streams API the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size";>KIP-659 has more details. + +To simplify testing, two new constructors that don't require a Properties parameter have been added to the TopologyTestDriver class. +required to pass in a Properties parameter. If Properties are passed Review comment: ```suggestion to the TopologyTestDriver class. If Properties are passed ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10048: MINOR: add docs for KIP-680
mjsax commented on a change in pull request #10048: URL: https://github.com/apache/kafka/pull/10048#discussion_r570698397 ## File path: docs/streams/upgrade-guide.html ## @@ -121,6 +121,12 @@ Streams API the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size";>KIP-659 has more details. + +To simplify testing, two new constructors that don't require a Properties parameter have been added to the TopologyTestDriver class. Review comment: ```suggestion To simplify testing, two new constructors that don't require a Properties parameter have been added ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on a change in pull request #10048: MINOR: add docs for KIP-680
JimGalasyn commented on a change in pull request #10048: URL: https://github.com/apache/kafka/pull/10048#discussion_r570687920 ## File path: docs/streams/upgrade-guide.html ## @@ -121,6 +121,12 @@ Streams API the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size";>KIP-659 has more details. + +To simplify testing, two new constructors are added to TopologyTestDriver class, that both don't +required to pass in a Properties parameter. Furthermore, even if Properties are passed Review comment: ```suggestion required to pass in a Properties parameter. If Properties are passed ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r570695899 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -227,6 +230,27 @@ public void initializeIfNeeded() { } } +private void initOffsetsIfNeeded(final java.util.function.Consumer> offsetResetter) { +final Map committed = mainConsumer.committed(resetOffsetsForPartitions); +for (final Map.Entry committedEntry : committed.entrySet()) { +final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue(); +if (offsetAndMetadata != null) { +mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata); +resetOffsetsForPartitions.remove(committedEntry.getKey()); +} +} + +if (!resetOffsetsForPartitions.isEmpty()) { Review comment: Fair enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r570691793 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -227,6 +230,27 @@ public void initializeIfNeeded() { } } +private void initOffsetsIfNeeded(final java.util.function.Consumer> offsetResetter) { Review comment: Good point! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`
mjsax merged pull request #9997: URL: https://github.com/apache/kafka/pull/9997 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose
[ https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279291#comment-17279291 ] Matthias J. Sax commented on KAFKA-7540: [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1827542034] {code:java} org.opentest4j.AssertionFailedError: Assignment did not complete on time ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486) at kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257) at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220) {code} STDOUT {code:java} [2021-02-04 01ː55ː43,911] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=fatal-exception-test] JoinGroup failed due to fatal error: The consumer group has reached its max size. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:636) [2021-02-04 01ː55ː43,913] ERROR [daemon-consumer-assignment]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76) org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group fatal-exception-test already has the configured maximum number of members. [2021-02-04 01ː56ː22,198] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] JoinGroup failed due to fatal error: The consumer group has reached its max size. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:636) [2021-02-04 01ː56ː22,199] ERROR [daemon-consumer-assignment]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76) org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group group-max-size-test already has the configured maximum number of members. {code} > Flaky Test ConsumerBounceTest#testClose > --- > > Key: KAFKA-7540 > URL: https://issues.apache.org/jira/browse/KAFKA-7540 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0 >Reporter: John Roesler >Assignee: Jason Gustafson >Priority: Critical > Labels: flaky-test > > Observed on Java 8: > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] > > Stacktrace: > {noformat} > java.lang.ArrayIndexOutOfBoundsException: -1 > at > kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146) > at > kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238) > at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestCla
[GitHub] [kafka] JimGalasyn commented on a change in pull request #10048: MINOR: add docs for KIP-680
JimGalasyn commented on a change in pull request #10048: URL: https://github.com/apache/kafka/pull/10048#discussion_r570687920 ## File path: docs/streams/upgrade-guide.html ## @@ -121,6 +121,12 @@ Streams API the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size";>KIP-659 has more details. + +To simplify testing, two new constructors are added to TopologyTestDriver class, that both don't +required to pass in a Properties parameter. Furthermore, even if Properties are passed Review comment: ```suggestion If Properties are passed ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on a change in pull request #10048: MINOR: add docs for KIP-680
JimGalasyn commented on a change in pull request #10048: URL: https://github.com/apache/kafka/pull/10048#discussion_r570687710 ## File path: docs/streams/upgrade-guide.html ## @@ -121,6 +121,12 @@ Streams API the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size";>KIP-659 has more details. + +To simplify testing, two new constructors are added to TopologyTestDriver class, that both don't Review comment: ```suggestion To simplify testing, two new constructors that don't require a Properties parameter have been added to the TopologyTestDriver class. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10044: MINOR: Word count should account for extra whitespaces between words
mjsax merged pull request #10044: URL: https://github.com/apache/kafka/pull/10044 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279288#comment-17279288 ] Matthias J. Sax commented on KAFKA-12283: - Failed again: https://github.com/apache/kafka/pull/10044/checks?check_run_id=1827160439 > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Chia-Ping Tsai >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #10046: MINOR: Extends RocksDB docs
mjsax merged pull request #10046: URL: https://github.com/apache/kafka/pull/10046 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10046: MINOR: Extends RocksDB docs
mjsax commented on a change in pull request #10046: URL: https://github.com/apache/kafka/pull/10046#discussion_r570683354 ## File path: docs/streams/developer-guide/memory-mgmt.html ## @@ -168,7 +168,15 @@ RocksDB Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include block_cache_size, write_buffer_size and max_write_buffer_number. These can be specified through the -rocksdb.config.setter configuration. +rocksdb.config.setter configuration. + Also, we recommend changing RocksDB's default memory allocator, because the default allocator may lead to increased memory consumption. +To change the memory allocator to jemalloc, you need to set the an environment variable before you start your Kafka Streams application: + +# example: install jemalloc (on Debian) +$ apt install -y libjemalloc-dev +# set LD_PRELOAD before you start your Kafka Streams application +$ LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so” Review comment: Good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett opened a new pull request #10065: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett opened a new pull request #10065: URL: https://github.com/apache/kafka/pull/10065 This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. Reviewers: Mickael Maison , Satish Duggana , David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on pull request #10064: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett commented on pull request #10064: URL: https://github.com/apache/kafka/pull/10064#issuecomment-773736235 This is a backport of 131d4753cfed65ed6dee0a8c754765c97c3d513f to 2.5. As with https://github.com/apache/kafka/pull/10061, the conflicts were due to the missing connection timeout config, which was added in 2.7, and didn't meaningfully change the behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett opened a new pull request #10064: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett opened a new pull request #10064: URL: https://github.com/apache/kafka/pull/10064 This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. Reviewers: Mickael Maison , Satish Duggana , David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #10063: KAFKA-12258: Add support for splitting appending records
jsancio opened a new pull request #10063: URL: https://github.com/apache/kafka/pull/10063 1. Type `BatchAccumulator`. Add support for appending records into one or more batches. 2. Type `RaftClient`. Rename `scheduleAppend` to `scheduleAtomicAppend`. 3. Type `RaftClient`. Add a new method `scheduleAppend` which appends records to the log using as many batches as necessary. 4. Increase the batch size from 1MB to 8MB. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
rhauch commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570657028 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers. + * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin} + * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}. + * + * The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)} + * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this + * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods, + * nor any previously returned {@link TopicAdmin} instances may be used. + * + * This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object, + * until this object is closed, at which point it cannot be used anymore + */ +public class SharedTopicAdmin implements AutoCloseable, Supplier { + +// Visible for testing +static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE); + +private final Map adminProps; +private final AtomicReference admin = new AtomicReference<>(); +private final AtomicBoolean closed = new AtomicBoolean(false); +private final Function, TopicAdmin> factory; + +public SharedTopicAdmin(Map adminProps) { +this(adminProps, TopicAdmin::new); +} + +// Visible for testing +SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) { +this.adminProps = Objects.requireNonNull(adminProps); +this.factory = Objects.requireNonNull(factory); +} + +/** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ +@Override +public TopicAdmin get() { +return topicAdmin(); +} + +/** + * Get the shared {@link TopicAdmin} instance. + * + * @return the shared instance; never null + * @throws ConnectException if this object has already been closed + */ +public TopicAdmin topicAdmin() { +return admin.updateAndGet(this::createAdmin); Review comment: I'm not sure there is much advantage either way, considering these methods are not called frequently and `synchronized` would indeed work. I personally like the simplicity of using `AtomicReference`, which to me seemed natural and straightforward, avoided having to synchronize the entire methods, and needed no if-checks in this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
rhauch commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570655008 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaBasedLog.class) +@PowerMockIgnore("javax.management.*") +public class SharedTopicAdminTest { + +private static final Map CONFIG = Collections.emptyMap(); + +@Mock private TopicAdmin mockTopicAdmin; +private SharedTopicAdmin sharedAdmin; +private int created = 0; Review comment: Really I'm just using that to be able to test that the new `topicAdmin()` method is returning the correct instance, even after repeated calls. It was an easy way to verify that the `TopicAdmin` matches what the factory function returned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r570654953 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { Review comment: Sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12295) Shallow Mirroring
Henry Cai created KAFKA-12295: - Summary: Shallow Mirroring Key: KAFKA-12295 URL: https://issues.apache.org/jira/browse/KAFKA-12295 Project: Kafka Issue Type: Improvement Components: consumer, core, mirrormaker, producer Reporter: Henry Cai Assignee: Henry Cai Fix For: 2.8.0 KIP-712: https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
rhauch commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570652791 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java ## @@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() { } } +@Test +public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +ConnectException e = assertThrows(ConnectException.class, () -> { +admin.endOffsets(tps); +}); +assertTrue(e.getMessage().contains("Not authorized to get the end offsets")); +} +} + +@Test +public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +ConnectException e = assertThrows(ConnectException.class, () -> { +admin.endOffsets(tps); +}); +assertTrue(e.getMessage().contains("is unsupported on brokers")); +} +} + +@Test +public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +RetriableException e = assertThrows(RetriableException.class, () -> { +admin.endOffsets(tps); +}); +assertTrue(e.getMessage().contains("Timed out while waiting")); +} +} + +@Test +public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +ConnectException e = assertThrows(ConnectException.class, () -> { +admin.endOffsets(tps); +}); +assertTrue(e.getMessage().contains("Error while getting end offsets for topic")); +} +} + +@Test +public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() { +String topicName = "myTopic"; +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +Map offsets = admin.endOffsets(Collections.emptySet()); +assertTrue(o
[GitHub] [kafka] kkonstantine merged pull request #10053: KAFKA-10834: Remove redundant type casts in Connect
kkonstantine merged pull request #10053: URL: https://github.com/apache/kafka/pull/10053 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API (KIP-661)
kkonstantine merged pull request #9726: URL: https://github.com/apache/kafka/pull/9726 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API
kkonstantine commented on pull request #9726: URL: https://github.com/apache/kafka/pull/9726#issuecomment-773706451 One failure on an unrelated flaky test Merging to coordinate merge with another PR. Thanks @mimaison ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
rhauch commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570648556 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -185,16 +188,33 @@ private final DistributedConfig config; +/** + * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs) + * that have the same group ID. + * + * @param config the configuration for the worker; may not be null + * @param time the clock to use; may not be null + * @param worker the {@link Worker} instance to use; may not be null + * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null + * @param statusBackingStore the backing store for statuses; may not be null + * @param configBackingStore the backing store for connector configurations; may not be null + * @param restUrlthe URL of this herder's REST API; may not be null + * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden + *in connector configurations; may not be null + * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, + * after all services and resources owned by this herder are stopped + */ public DistributedHerder(DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { Review comment: The reason I used a variadic array here was to avoid having to create a new connector when no `AutoCloseable` instances are supplied. If we use a List, then we can change the usage in Connect runtime and in MirrorMaker 2, but anywhere else will break without keeping the old signature. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe split() operator
mjsax commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-773692994 Merged to `trunk`. Congrats for getting this into the 2.8.0 release @inponomarev -- great work! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9107: KAFKA-5488: Add type-safe split() operator
mjsax merged pull request #9107: URL: https://github.com/apache/kafka/pull/9107 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #10046: MINOR: Extends RocksDB docs
rodesai commented on a change in pull request #10046: URL: https://github.com/apache/kafka/pull/10046#discussion_r570627385 ## File path: docs/streams/developer-guide/memory-mgmt.html ## @@ -168,7 +168,15 @@ RocksDB Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include block_cache_size, write_buffer_size and max_write_buffer_number. These can be specified through the -rocksdb.config.setter configuration. +rocksdb.config.setter configuration. + Also, we recommend changing RocksDB's default memory allocator, because the default allocator may lead to increased memory consumption. +To change the memory allocator to jemalloc, you need to set the an environment variable before you start your Kafka Streams application: + +# example: install jemalloc (on Debian) +$ apt install -y libjemalloc-dev +# set LD_PRELOAD before you start your Kafka Streams application +$ LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so” Review comment: just doing `LD_PRELOAD=` will only set the variable for that one command, which isn't doing anything. This should either be: ``` $ export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so" $ ``` OR ``` $ LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers closed pull request #10058: [2.5] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers closed pull request #10058: URL: https://github.com/apache/kafka/pull/10058 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers closed pull request #10057: [2.4] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers closed pull request #10057: URL: https://github.com/apache/kafka/pull/10057 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers closed pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers closed pull request #10055: URL: https://github.com/apache/kafka/pull/10055 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
bob-barrett commented on pull request #10055: URL: https://github.com/apache/kafka/pull/10055#issuecomment-773678966 @ableegoldman the 2.6 backport is https://github.com/apache/kafka/pull/10061. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] szpak opened a new pull request #10062: MINOR: Add performAndClose default method in KeyValueIterator
szpak opened a new pull request #10062: URL: https://github.com/apache/kafka/pull/10062 That method intends to increase a chance to have KeyValueIterator closed after usage, by providing a convenient performAndClose() default method which executes a given operation and guarantee to automatically close the iterator right after. ### Rationality I decided to create that PR observing in different projects how often an iterator is left open after performing operations such as `.forEachRemaining(...)` (who reads JavaDoc after all? ;-) ). For people aware of the problem, instead of verbose try-with-resources constructions repeated in every place in code: ``` try (KeyValueIterator ordersKeyValueIterator = streamsBuilderFactoryBean .getKafkaStreams() .store(StoreQueryParameters.fromNameAndType(ORDERS_STORE, QueryableStoreTypes.keyValueStore())) .all()) { ordersKeyValueIterator .forEachRemaining(kv -> { ... }); } ``` (or hidden in in-house built utility classes) a developer using Kafka Streams has a built-in clear way to perform operations on elements contained in an iterator (and have that stream closed after that automatically): ``` streamsBuilderFactoryBean .getKafkaStreams() .store(StoreQueryParameters.fromNameAndType(ORDERS_STORE, QueryableStoreTypes.keyValueStore())) .all() .performAndClose(iterator -> iterator .forEachRemaining(kv -> { ... }) ); ``` I believe it can increase a rate of closed iterator in user projects. ### Testing approach Being a default method in KeyValueIterator, I decided to test it using KeyValueIteratorFacade which already provides a nice unit testing infrastructure with mocked iterator. I don't know the implementation details, but I expect to have it behaved similarly in other implementations. I might miss some extra cases, so feel free to point them and I will happily cover them with tests. ### Rejected extensions For some time past, I was thinking also about covering operations that returns some value. However, at least in projects I've been observing the usage of Kafka Streams, performing side effects is more popular and implementation for that use cases might be more tricky and I decided to start with pure Consumer. ### Other information Naming is not my area of expertise. Feel free to propose any better name or any other possible enhancements to my MR :-). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett commented on pull request #10061: URL: https://github.com/apache/kafka/pull/10061#issuecomment-773678032 @dajac This is the backport of https://github.com/apache/kafka/commit/131d4753cfed65ed6dee0a8c754765c97c3d513f. The conflicts were because the connection timeout settings added by [KIP-601](https://cwiki.apache.org/confluence/display/KAFKA/KIP-601%3A+Configurable+socket+connection+timeout+in+NetworkClient) aren't present in 2.6. This didn't change the behavior of this patch, but it did require a slight change in `NetworkClientTest#testFailedConnectionToFirstAddress` to simulate a failed connection, since sleeping for the connection timeout wasn't an option. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett opened a new pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects (#9902)
bob-barrett opened a new pull request #10061: URL: https://github.com/apache/kafka/pull/10061 This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. Reviewers: Mickael Maison , Satish Duggana , David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers commented on pull request #10055: URL: https://github.com/apache/kafka/pull/10055#issuecomment-773675865 @ableegoldman I believe @bob-barrett was working on a PR against 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
ableegoldman commented on pull request #10055: URL: https://github.com/apache/kafka/pull/10055#issuecomment-773675180 Hey @gardnervickers , do you have plans to open a PR against 2.6 as well? Just checking in since it seems this fix has been ported to everything except 2.6 so far and I want to keep track for the 2.6.2 release This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7
ableegoldman opened a new pull request #10060: URL: https://github.com/apache/kafka/pull/10060 Port of https://github.com/apache/kafka/pull/9978 to the 2.7 branch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpoldrugo commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
dpoldrugo commented on pull request #10059: URL: https://github.com/apache/kafka/pull/10059#issuecomment-773663807 @ijuma, @omkreddy, @rajinisivaram could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r570605728 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax can you please explain again what is expected now as for me this is contrary to initial KIP. What I have understood from KIP, we wanted to return system time from Stream Task. Considering changes in this pr, 1. We added one time field in InternalMockProcessorContext which we return when currentSystemTime() is called. 2. When ProcessorContextImpl's currentSystemTime() method is called, we return time from streamTask's time field. 3. We also added time field in GlobalProcessorContextImpl which we return from currentSystemTime(). 4. If we add new cachedSystemTimeMs field in AbstractProcessorContext, when do you want to return this field? Are earlier changes not valid to return time from StreamTask? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics
kkonstantine commented on a change in pull request #9780: URL: https://github.com/apache/kafka/pull/9780#discussion_r570584036 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config, String restUrl, ConnectMetrics metrics, Time time, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { Review comment: see comment above ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -185,16 +188,33 @@ private final DistributedConfig config; +/** + * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs) + * that have the same group ID. + * + * @param config the configuration for the worker; may not be null + * @param time the clock to use; may not be null + * @param worker the {@link Worker} instance to use; may not be null + * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null + * @param statusBackingStore the backing store for statuses; may not be null + * @param configBackingStore the backing store for connector configurations; may not be null + * @param restUrlthe URL of this herder's REST API; may not be null + * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden + *in connector configurations; may not be null + * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, + * after all services and resources owned by this herder are stopped + */ public DistributedHerder(DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, - ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) { + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + AutoCloseable... uponShutdown) { Review comment: I think it's better to avoid a variadic argument here. Parameters tend to get added with new features in such constructors. And if a new parameter is required that is also a list, then we'll have a mix of list args with a variadic in the end. Since we transform to list I'd suggest using this type here and pass the single argument with `Collections.singletonList` in the caller. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +@RunWith(PowerMockRunner.class) +@Pr
[GitHub] [kafka] cmccabe merged pull request #10030: MINOR: Add KafkaEventQueue
cmccabe merged pull request #10030: URL: https://github.com/apache/kafka/pull/10030 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10030: MINOR: Add KafkaEventQueue
cmccabe commented on pull request #10030: URL: https://github.com/apache/kafka/pull/10030#issuecomment-773648975 Test failure is `org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest` which is not related. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API
C0urante commented on pull request #9726: URL: https://github.com/apache/kafka/pull/9726#issuecomment-773648389 Just ran into a situation last night where this would have been super helpful. Thanks for adding this @mimaison! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API
kkonstantine commented on a change in pull request #9726: URL: https://github.com/apache/kafka/pull/9726#discussion_r570577845 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -188,6 +188,16 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward); } +@GET +@Path("/{connector}/tasks-config") +public Map> getTasksConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, Review comment: That's right 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpoldrugo opened a new pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
dpoldrugo opened a new pull request #10059: URL: https://github.com/apache/kafka/pull/10059 Description: As suggested by @omkreddy in this [comment](https://issues.apache.org/jira/browse/KAFKA-8562?focusedCommentId=16912437&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16912437), implemented avoiding (reverse) DNS lookup while building underlying SslTransportLayer. How the problem manifested: When clients or other brokers are connecting to a broker using SASL_SSL, a broker was doing (reverse) DNS lookup and if there is no PTR Record, the lookup could last several seconds, which in the end caused big latencies on several parts of the system... replication, consume requests and produce requests. Here you can see a recorded sample: https://user-images.githubusercontent.com/1514332/106959147-9033a580-673a-11eb-9575-4b9fe986cb30.png";> Also, here is a Wireshark packet capture for DNS requests, and in this case you can see that it lasted more then 11 seconds: ![KAFKA-8562 wireshark dns packet capture](https://user-images.githubusercontent.com/1514332/106960332-37650c80-673c-11eb-91ab-9cab8dd4873d.png) When using PLAINTEXT or SSL, this problem doesn't manifest. Solution: In #2835 , @rajinisivaram already added a helper method `SslChannelBuilder.peerHost`, so I just moved it to a new class called `ChannelBuilderUtils` and used it in `SaslChannelBuilder.buildTransportLayer` method. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r570568151 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel, (responseTopics ++ unauthorizedResponseStatus).toList } - private def createTopic(topic: String, - numPartitions: Int, - replicationFactor: Int, - properties: util.Properties = new util.Properties()): MetadataResponseTopic = { -try { - adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful" -.format(topic, numPartitions, replicationFactor)) - metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList()) -} catch { - case _: TopicExistsException => // let it go, possibly another broker created this topic Review comment: The problem we have is that `ZkAdminManager.createTopics` only takes a callback instead of responding to you in realtime whether we hit TopicExists. Right now we are doing the topic creation async, so unless this is necessary to be fixed (which today we would just return UNKNOWN_PARTITION which seems to be semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning unknown partition immediately without waiting for the async creation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-12294: Component/s: core > Consider using the forwarding mechanism for metadata auto topic creation > > > Key: KAFKA-12294 > URL: https://issues.apache.org/jira/browse/KAFKA-12294 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Boyang Chen >Priority: Major > > Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to > improve the topic creation auditing by forwarding the CreateTopicsRequest > inside Envelope for the given client. Details in > [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-12294: Parent: (was: KAFKA-9705) Issue Type: Improvement (was: Sub-task) > Consider using the forwarding mechanism for metadata auto topic creation > > > Key: KAFKA-12294 > URL: https://issues.apache.org/jira/browse/KAFKA-12294 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Priority: Major > > Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to > improve the topic creation auditing by forwarding the CreateTopicsRequest > inside Envelope for the given client. Details in > [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation
Boyang Chen created KAFKA-12294: --- Summary: Consider using the forwarding mechanism for metadata auto topic creation Key: KAFKA-12294 URL: https://issues.apache.org/jira/browse/KAFKA-12294 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to improve the topic creation auditing by forwarding the CreateTopicsRequest inside Envelope for the given client. Details in [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] gardnervickers opened a new pull request #10058: [2.5] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers opened a new pull request #10058: URL: https://github.com/apache/kafka/pull/10058 The original PR was #9902 The reason for back-porting this improved test infrastructure is to address some flakes which we have seen when kafka.apache.org DNS changes, specifically in the ClusterConnectionStatesTest. This change uses a mocked resolver for the multi-ip tests. Related PR: #10055 #10057 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API
mimaison commented on a change in pull request #9726: URL: https://github.com/apache/kafka/pull/9726#discussion_r570545828 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -188,6 +188,16 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward); } +@GET +@Path("/{connector}/tasks-config") +public Map> getTasksConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, Review comment: You mean something like this? ```java @GET @Path("/{connector}/tasks-config") public Map> getTasksConfig( final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback>> cb = new FutureCallback<>(); ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers opened a new pull request #10057: Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers opened a new pull request #10057: URL: https://github.com/apache/kafka/pull/10057 The original PR was #9902 The reason for back-porting this improved test infrastructure is to address some flakes which we have seen when kafka.apache.org DNS changes, specifically in the ClusterConnectionStatesTest. This change uses a mocked resolver for the multi-ip tests. Related PR: #10055 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12293) Remove JCenter and Bintray repositories mentions out of Gradle build (sunset is announced for those repositories)
[ https://issues.apache.org/jira/browse/KAFKA-12293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279162#comment-17279162 ] Dejan Stojadinović commented on KAFKA-12293: Note: at the moment I am not sure about Kafka releasing process (i.e. publishing to a global public repositories). If Kafka exclusively publishes to JCenter then this solution should be expanded. > Remove JCenter and Bintray repositories mentions out of Gradle build (sunset > is announced for those repositories) > - > > Key: KAFKA-12293 > URL: https://issues.apache.org/jira/browse/KAFKA-12293 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Dejan Stojadinović >Assignee: Dejan Stojadinović >Priority: Major > > *Intro:* > [https://jfrog.com/blog/into-the-sunset-bintray-jcenter-gocenter-and-chartcenter] > *Quote (from a link above):* > {quote} > May 1st: Bintray, JCenter, GoCenter, and ChartCenter services will no longer > be available > {quote} > *Note:* it seems that Gradle will make some changes in order to resolve > _*jcenter()*_ to _*mavenCentral()*_ by a default: > [https://github.com/gradle/gradle/issues/16018] > I took the liberty to assign this to myself (I already have a few gradle > related commits in Kafka repository; also I have some work-in-progress PR for > this issue that will be pushed asap). > FYI [~ijuma] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10005: MINOR: Add ConfigRepository, use in Partition and KafkaApis
hachikuji merged pull request #10005: URL: https://github.com/apache/kafka/pull/10005 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 opened a new pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
dejan2609 opened a new pull request #10056: URL: https://github.com/apache/kafka/pull/10056 Related jira ticket: https://issues.apache.org/jira/browse/KAFKA-12293 **_Remove JCenter and Bintray repositories mentions out of Gradle build (sunset is announced for those repositories)_** Note: at the moment I am not sure about Kafka releasing process (i.e. publishing to global public repositories). If Kafka exclusively publishes to JCenter then this solution should be expanded. FYI @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers opened a new pull request #10055: Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior
gardnervickers opened a new pull request #10055: URL: https://github.com/apache/kafka/pull/10055 The original PR was #9902 The reason for back-porting this improved test infrastructure is to address some flakes which we have seen when kafka.apache.org DNS changes, specifically in the `ClusterConnectionStatesTest`. This change uses a mocked resolver for the multi-ip tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
ableegoldman commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r570535029 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -227,6 +230,27 @@ public void initializeIfNeeded() { } } +private void initOffsetsIfNeeded(final java.util.function.Consumer> offsetResetter) { +final Map committed = mainConsumer.committed(resetOffsetsForPartitions); +for (final Map.Entry committedEntry : committed.entrySet()) { +final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue(); +if (offsetAndMetadata != null) { +mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata); +resetOffsetsForPartitions.remove(committedEntry.getKey()); +} +} + +if (!resetOffsetsForPartitions.isEmpty()) { Review comment: Can we just pass in a no-op lambda instead? I'd rather avoid special handling for null input that isn't supposed to be null, just so we can use null in the tests (which are therefore not realistic tests since it should never be null, no?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12293) Remove JCenter and Bintray repositories mentions out of Gradle build (sunset is announced for those repositories)
Dejan Stojadinović created KAFKA-12293: -- Summary: Remove JCenter and Bintray repositories mentions out of Gradle build (sunset is announced for those repositories) Key: KAFKA-12293 URL: https://issues.apache.org/jira/browse/KAFKA-12293 Project: Kafka Issue Type: Task Components: build Reporter: Dejan Stojadinović Assignee: Dejan Stojadinović *Intro:* [https://jfrog.com/blog/into-the-sunset-bintray-jcenter-gocenter-and-chartcenter] *Quote (from a link above):* {quote} May 1st: Bintray, JCenter, GoCenter, and ChartCenter services will no longer be available {quote} *Note:* it seems that Gradle will make some changes in order to resolve _*jcenter()*_ to _*mavenCentral()*_ by a default: [https://github.com/gradle/gradle/issues/16018] I took the liberty to assign this to myself (I already have a few gradle related commits in Kafka repository; also I have some work-in-progress PR for this issue that will be pushed asap). FYI [~ijuma] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10054: KAFKA-12283 Flaky Test RebalanceSourceConnectorsIntegrationTest#testM…
chia7712 commented on pull request #10054: URL: https://github.com/apache/kafka/pull/10054#issuecomment-773582456 @ramesh-muthusamy Could you take a look? thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #10054: KAFKA-12283 Flaky Test RebalanceSourceConnectorsIntegrationTest#testM…
chia7712 opened a new pull request #10054: URL: https://github.com/apache/kafka/pull/10054 issue: https://issues.apache.org/jira/browse/KAFKA-12283 It seems to me the new tasks distribution (8, 4, 4) is valid if following conditions are true. 1. all tasks are assigned to (single) alive worker before we re-add workers 2. rebalance is triggered by first re-added worker only (rebalance revokes 8 tasks from alive worker). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
[ https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12284: -- Assignee: (was: Chia-Ping Tsai) > Flaky Test > MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync > - > > Key: KAFKA-12284 > URL: https://issues.apache.org/jira/browse/KAFKA-12284 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470] > {quote} {{java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}} > [...] > > {{Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364) > ... 92 more > Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists.}} > {quote} > STDOUT > {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:354) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748)}}{quote} > {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state > info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}} > {{}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12283: -- Assignee: Chia-Ping Tsai > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Chia-Ping Tsai >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
[ https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12284: -- Assignee: Chia-Ping Tsai > Flaky Test > MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync > - > > Key: KAFKA-12284 > URL: https://issues.apache.org/jira/browse/KAFKA-12284 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Assignee: Chia-Ping Tsai >Priority: Critical > Labels: flaky-test > > [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470] > {quote} {{java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}} > [...] > > {{Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364) > ... 92 more > Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists.}} > {quote} > STDOUT > {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:354) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748)}}{quote} > {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state > info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}} > {{}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API
kkonstantine commented on a change in pull request #9726: URL: https://github.com/apache/kafka/pull/9726#discussion_r570505650 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -188,6 +188,16 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward); } +@GET +@Path("/{connector}/tasks-config") +public Map> getTasksConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, Review comment: nit: this alignment might be a bit off. We can follow the pattern of 2 tabs with first and last line on their own if that looks better I guess. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums
jsancio commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r570507065 ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZk(createException: => Exception): ZkSupport Review comment: This comment also applies to `requireRaft`. How about `requireZkOrThrow`? There is already precedence of this naming convention in Kafka and Java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API
mimaison commented on pull request #9726: URL: https://github.com/apache/kafka/pull/9726#issuecomment-773567218 Thanks @kkonstantine for the feedback. I've pushed an update This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
ableegoldman commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r570495244 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -227,6 +230,27 @@ public void initializeIfNeeded() { } } +private void initOffsetsIfNeeded(final java.util.function.Consumer> offsetResetter) { Review comment: Hm...I'm not necessarily that concerned about calling `mainConsumer.committed` twice in rare cases (although maybe that would not be so good, since those rare cases happen to be those in which this is probably more likely to time out, right?) But personally, just coming into this code from the outside, it's super confusing to have two different methods for initializing the offsets. It seems more convoluted that way, to me. Also maybe I am missing some context here but why do we call `initOffsetsIfNeeded` from `initializeIfNeeded` rather than from `completeRestoration` in the first place? We don't need to initialize main consumer offsets until it transitions to running This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r570492431 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel, // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader. // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors. val errorUnavailableListeners = requestVersion >= 6 -val topicMetadata = +val (topicMetadata, nonExistTopicMetadata) = if (authorizedTopics.isEmpty) -Seq.empty[MetadataResponseTopic] - else { -getTopicMetadata( - metadataRequest.allowAutoTopicCreation, - metadataRequest.isAllTopics, - authorizedTopics, - request.context.listenerName, - errorUnavailableEndpoints, - errorUnavailableListeners -) +(Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic]) + else +getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics, + request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) + +nonExistTopicMetadata.foreach(metadata => + try { +// Validate topic name and propagate error if failed +Topic.validate(metadata.name()) + } catch { +case e: Exception => + metadata.setErrorCode(Errors.forException(e).code) } +) + +if (nonExistTopicMetadata.nonEmpty && metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) { + val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) + autoTopicCreationManager.createTopics( +nonExistTopicMetadata.map(metadata => getTopicConfigs(metadata.name())).toSet, controllerMutationQuota) Review comment: I guess we could rely on admin manager to do the validation for us. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r570492221 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.ConcurrentHashMap + +import kafka.controller.KafkaController +import kafka.utils.Logging +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.message.CreateTopicsRequestData +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.requests.CreateTopicsRequest +import org.apache.kafka.common.utils.Time + +import scala.collection.Map + +trait AutoTopicCreationManager { + + def createTopics( +topicNames: Set[CreatableTopic], +controllerMutationQuota: ControllerMutationQuota + ): Unit + + def start(): Unit = {} + + def shutdown(): Unit = {} +} + +object AutoTopicCreationManager { + + def apply( +config: KafkaConfig, +metadataCache: MetadataCache, +time: Time, +metrics: Metrics, +threadNamePrefix: Option[String], +adminManager: ZkAdminManager, +controller: KafkaController, +enableForwarding: Boolean + ): AutoTopicCreationManager = { + +val channelManager = + if (enableForwarding) +Some(new BrokerToControllerChannelManager( + controllerNodeProvider = MetadataCacheControllerNodeProvider( +config, metadataCache), + time = time, + metrics = metrics, + config = config, + channelName = "autoTopicCreationChannel", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = config.requestTimeoutMs.longValue +)) + else +None +new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs) + } +} + +class AutoTopicCreationManagerImpl( + channelManager: Option[BrokerToControllerChannelManager], + adminManager: ZkAdminManager, + controller: KafkaController, + requestTimeout: Int +) extends AutoTopicCreationManager with Logging { + + private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic] + + override def start(): Unit = { +channelManager.foreach(_.start()) + } + + override def shutdown(): Unit = { +channelManager.foreach(_.shutdown()) + } + + override def createTopics(topics: Set[CreatableTopic], +controllerMutationQuota: ControllerMutationQuota): Unit = { +val topicConfigs = topics + .filter(topic => !inflightTopics.contains(topic.name())) Review comment: You mean omit () for `topic.name()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12279) Kafka 2.7 stream app issue
[ https://issues.apache.org/jira/browse/KAFKA-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279097#comment-17279097 ] prabhu biradar commented on KAFKA-12279: Hi Sophie Blee-Goldman, 1) This error occurs on every restart post 2.7 Kafka upgrade. 2) Yes we are running 12 instances (6 ECS TASK and each TASKk runs 2 threads) 3) Below is the detailed logs starting from application restart. Completed 23.2 KiB/23.2 KiB (228.6 KiB/s) with 1 file(s) remainingCompleted 23.2 KiB/23.2 KiB (228.6 KiB/s) with 1 file(s) remainingdownload: s3://x/perf/kafka-client-truststore.jks to ./client.truststore.jksCompleted 256.0 KiB/20.9 MiB (1.9 MiB/s) with 1 file(s) remainingCompleted 512.0 KiB/20.9 MiB (3.7 MiB/s) with 1 file(s) remainingCompleted 768.0 KiB/20.9 MiB (5.4 MiB/s) with 1 file(s) remainingCompleted 1.0 MiB/20.9 MiB (7.2 MiB/s) with 1 file(s) remaining Completed 1.2 MiB/20.9 MiB (8.9 MiB/s) with 1 file(s) remaining Completed 1.5 MiB/20.9 MiB (10.5 MiB/s) with 1 file(s) remaining Completed 1.8 MiB/20.9 MiB (12.1 MiB/s) with 1 file(s) remaining Completed 2.0 MiB/20.9 MiB (13.8 MiB/s) with 1 file(s) remaining Completed 2.2 MiB/20.9 MiB (15.3 MiB/s) with 1 file(s) remaining Completed 2.5 MiB/20.9 MiB (16.9 MiB/s) with 1 file(s) remaining Completed 2.8 MiB/20.9 MiB (18.3 MiB/s) with 1 file(s) remaining Completed 3.0 MiB/20.9 MiB (19.9 MiB/s) with 1 file(s) remaining Completed 3.2 MiB/20.9 MiB (21.4 MiB/s) with 1 file(s) remaining Completed 3.5 MiB/20.9 MiB (22.8 MiB/s) with 1 file(s) remaining Completed 3.8 MiB/20.9 MiB (24.3 MiB/s) with 1 file(s) remaining Completed 4.0 MiB/20.9 MiB (25.8 MiB/s) with 1 file(s) remaining Completed 4.2 MiB/20.9 MiB (27.0 MiB/s) with 1 file(s) remaining Completed 4.5 MiB/20.9 MiB (28.4 MiB/s) with 1 file(s) remaining Completed 4.8 MiB/20.9 MiB (29.9 MiB/s) with 1 file(s) remaining Completed 5.0 MiB/20.9 MiB (31.1 MiB/s) with 1 file(s) remaining Completed 5.2 MiB/20.9 MiB (32.4 MiB/s) with 1 file(s) remaining Completed 5.5 MiB/20.9 MiB (33.7 MiB/s) with 1 file(s) remaining Completed 5.8 MiB/20.9 MiB (34.9 MiB/s) with 1 file(s) remaining Completed 6.0 MiB/20.9 MiB (36.1 MiB/s) with 1 file(s) remaining Completed 6.2 MiB/20.9 MiB (37.4 MiB/s) with 1 file(s) remaining Completed 6.5 MiB/20.9 MiB (38.7 MiB/s) with 1 file(s) remaining Completed 6.8 MiB/20.9 MiB (39.9 MiB/s) with 1 file(s) remaining Completed 7.0 MiB/20.9 MiB (41.1 MiB/s) with 1 file(s) remaining Completed 7.2 MiB/20.9 MiB (42.3 MiB/s) with 1 file(s) remaining Completed 7.5 MiB/20.9 MiB (43.5 MiB/s) with 1 file(s) remaining Completed 7.8 MiB/20.9 MiB (44.6 MiB/s) with 1 file(s) remaining Completed 8.0 MiB/20.9 MiB (45.7 MiB/s) with 1 file(s) remaining Completed 8.2 MiB/20.9 MiB (47.1 MiB/s) with 1 file(s) remaining Completed 8.5 MiB/20.9 MiB (48.1 MiB/s) with 1 file(s) remaining Completed 8.8 MiB/20.9 MiB (49.1 MiB/s) with 1 file(s) remaining Completed 9.0 MiB/20.9 MiB (50.2 MiB/s) with 1 file(s) remaining Completed 9.2 MiB/20.9 MiB (51.5 MiB/s) with 1 file(s) remaining Completed 9.5 MiB/20.9 MiB (52.3 MiB/s) with 1 file(s) remaining Completed 9.8 MiB/20.9 MiB (53.4 MiB/s) with 1 file(s) remaining Completed 10.0 MiB/20.9 MiB (54.5 MiB/s) with 1 file(s) remainingCompleted 10.2 MiB/20.9 MiB (55.3 MiB/s) with 1 file(s) remainingCompleted 10.5 MiB/20.9 MiB (56.3 MiB/s) with 1 file(s) remainingCompleted 10.8 MiB/20.9 MiB (57.4 MiB/s) with 1 file(s) remainingCompleted 11.0 MiB/20.9 MiB (58.3 MiB/s) with 1 file(s) remainingCompleted 11.2 MiB/20.9 MiB (59.3 MiB/s) with 1 file(s) remainingCompleted 11.5 MiB/20.9 MiB (60.2 MiB/s) with 1 file(s) remainingCompleted 11.8 MiB/20.9 MiB (61.1 MiB/s) with 1 file(s) remainingCompleted 12.0 MiB/20.9 MiB (62.0 MiB/s) with 1 file(s) remainingCompleted 12.2 MiB/20.9 MiB (63.0 MiB/s) with 1 file(s) remainingCompleted 12.5 MiB/20.9 MiB (63.9 MiB/s) with 1 file(s) remainingCompleted 12.8 MiB/20.9 MiB (64.7 MiB/s) with 1 file(s) remainingCompleted 13.0 MiB/20.9 MiB (65.7 MiB/s) with 1 file(s) remainingCompleted 13.2 MiB/20.9 MiB (66.5 MiB/s) with 1 file(s) remainingCompleted 13.5 MiB/20.9 MiB (67.4 MiB/s) with 1 file(s) remainingCompleted 13.8 MiB/20.9 MiB (68.3 MiB/s) with 1 file(s) remainingCompleted 14.0 MiB/20.9 MiB (69.2 MiB/s) with 1 file(s) remainingCompleted 14.2 MiB/20.9 MiB (70.0 MiB/s) with 1 file(s) remainingCompleted 14.5 MiB/20.9 MiB (71.0 MiB/s) with 1 file(s) remainingCompleted 14.8 MiB/20.9 MiB (71.7 MiB/s) with 1 file(s) remainingCompleted 15.0 MiB/20.9 MiB (72.5 MiB/s) with 1 file(s) remainingCompleted 15.2 MiB/20.9 MiB (73.5 MiB/s) with 1 file(s) remainingCompleted 15.5 MiB/20.9 MiB (74.2 MiB/s) with 1 file(s) remainingCompleted 15.8 MiB/20.9 MiB (75.1 MiB/s) with 1 file(s) remainingCompleted 16.0 MiB/20.9 MiB (76.1 MiB/s) with 1 file(s) remainingCompleted 16.1 MiB/20.9 MiB (76.0 MiB/s) with 1 file(s) remainingComplet
[GitHub] [kafka] vvcephei commented on pull request #9107: KAFKA-5488: Add type-safe split() operator
vvcephei commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-773549603 Thanks, all, that Scala fix looks perfect to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r570480399 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel, // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader. // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors. val errorUnavailableListeners = requestVersion >= 6 -val topicMetadata = +val (topicMetadata, nonExistTopicMetadata) = if (authorizedTopics.isEmpty) -Seq.empty[MetadataResponseTopic] - else { -getTopicMetadata( - metadataRequest.allowAutoTopicCreation, - metadataRequest.isAllTopics, - authorizedTopics, - request.context.listenerName, - errorUnavailableEndpoints, - errorUnavailableListeners -) +(Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic]) + else +getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics, + request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) + +nonExistTopicMetadata.foreach(metadata => + try { +// Validate topic name and propagate error if failed +Topic.validate(metadata.name()) Review comment: Actually after looking into the zk admin manager logic, I don't think it's necessary to do the topic validation here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { -val cache = new MetadataCache(1) -val topic = "topic" -val topicPartition = new TopicPartition(topic, 0) -val securityProtocol = SecurityProtocol.PLAINTEXT -val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - -val brokers = Seq( - new UpdateMetadataBroker() -.setId(0) -.setRack("") -.setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() -.setId(1) -.setEndpoints(Seq.empty.asJava) -) -val controllerEpoch = 1 -val leader = 1 -val leaderEpoch = 0 -val replicas = asList[Integer](0, 1) -val isr = asList[Integer](0, 1) -val offline = asList[Integer](1) -val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) -val version = ApiKeys.UPDATE_METADATA.latestVersion -val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() -cache.updateMetadata(15, updateMetadataRequest) - -val expectedNode0 = new Node(0, "foo", 9092) -val expectedNode1 = new Node(1, "", -1) - -val cluster = cache.getClusterMetadata("clusterId", listenerName) Review comment: Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. This leads to `cluster.leaderFor` on L534 returning null @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { -val cache = new MetadataCache(1) -val topic = "topic" -val topicPartition = new TopicPartition(topic, 0) -val securityProtocol = SecurityProtocol.PLAINTEXT -val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - -val brokers = Seq( - new UpdateMetadataBroker() -.setId(0) -.setRack("") -.setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() -.setId(1) -.setEndpoints(Seq.empty.asJava) -) -val controllerEpoch = 1 -val leader = 1 -val leaderEpoch = 0 -val replicas = asList[Integer](0, 1) -val isr = asList[Integer](0, 1) -val offline = asList[Integer](1) -val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) -val version = ApiKeys.UPDATE_METADATA.latestVersion -val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() -cache.updateMetadata(15, updateMetadataRequest) - -val expectedNode0 = new Node(0, "foo", 9092) -val expectedNode1 = new Node(1, "", -1) - -val cluster = cache.getClusterMetadata("clusterId", listenerName) Review comment: Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #10053: KAFKA-10834: Remove redundant type casts in Connect
kkonstantine commented on a change in pull request #10053: URL: https://github.com/apache/kafka/pull/10053#discussion_r570464187 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -236,7 +236,7 @@ public DistributedHerder(DistributedConfig config, this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, -new LinkedBlockingDeque(1), +new LinkedBlockingDeque<>(1), Review comment: same nit as above ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -63,7 +63,7 @@ public SourceTaskOffsetCommitter(WorkerConfig config) { this(config, Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory( SourceTaskOffsetCommitter.class.getSimpleName() + "-%d", false)), -new ConcurrentHashMap>()); +new ConcurrentHashMap<>()); Review comment: nit: format got unaligned. Please check the suggestion fixes it ```suggestion new ConcurrentHashMap<>()); ``` ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java ## @@ -269,8 +269,10 @@ public void testGetSetNull() throws Exception { final Capture> secondGetReadToEndCallback = EasyMock.newCapture(); storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback)); PowerMock.expectLastCall().andAnswer(() -> { -capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array())); -capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null)); +capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, +null, TP0_VALUE.array())); +capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), +null)); Review comment: ```suggestion capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), null)); ``` ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java ## @@ -269,8 +269,10 @@ public void testGetSetNull() throws Exception { final Capture> secondGetReadToEndCallback = EasyMock.newCapture(); storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback)); PowerMock.expectLastCall().andAnswer(() -> { -capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array())); -capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null)); +capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, +null, TP0_VALUE.array())); Review comment: ```suggestion capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, null, TP0_VALUE.array())); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java ## @@ -39,7 +39,8 @@ public void close() throws Exception { protected ConfigValue configValue(Map.Entry configEntry) { ConfigValue configValue = -new ConfigValue(configEntry.getKey(), configEntry.getValue(), new ArrayList<>(), new ArrayList()); +new ConfigValue(configEntry.getKey(), configEntry.getValue(), new ArrayList<>(), +new ArrayList<>()); Review comment: In this project width can be longer. For old lines let's keep it like that. New is fine to format based on a 100 char width (I believe) but again not required currently. ```suggestion new ConfigValue(configEntry.getKey(), configEntry.getValue(), new ArrayList<>(), new ArrayList<>()); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ## @@ -390,12 +390,14 @@ private void checkAndPutConnectorConfigName(String connectorName, Map T completeOrForwardRequest(FutureCallback cb, String path, String method, HttpHeaders headers, Object body,
[GitHub] [kafka] cmccabe merged pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration
cmccabe merged pull request #10019: URL: https://github.com/apache/kafka/pull/10019 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
jsancio commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570367570 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { Review comment: @cmccabe What do you think about splitting this functionality into two types? For example: 1. `EventQueue` is a type which is responsible for ordering events given the the insertion type and deadline. This type is thread-safe but doesn't instantiate thread(s). This type exposes methods for enqueuing and dequeuing events. The dequeuing method(s) can take in a "time" parameter and polls to see if there is an event ready. The dequeue method(s) would need to return the difference between "time" and the next closest event in the queue. 2. `SingleThreadEventExecutor` is a type which spawns a thread to dequeue events from the `EventQueue`, executes the `run` or `handleException` methods of the event and it is `AutoCloseable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
jsancio commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570462338 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { Review comment: Okay. I suggested it because maybe unittests would be easier to write since the tests would have to deal with concurrency. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
junrao commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181 ## File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import org.slf4j.Logger; + +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public interface EventQueue extends AutoCloseable { +interface Event { +/** + * Run the event. + */ +void run() throws Exception; + +/** + * Handle an exception that was either generated by running the event, or by the + * event queue's inability to run the event. + * + * @param e The exception. This will be a TimeoutException if the event hit + * its deadline before it could be scheduled. + * It will be a RejectedExecutionException if the event could not be + * scheduled because the event queue has already been closed. + * Otherweise, it will be whatever exception was thrown by run(). + */ +default void handleException(Throwable e) {} +} + +abstract class FailureLoggingEvent implements Event { +private final Logger log; + +public FailureLoggingEvent(Logger log) { +this.log = log; +} + +@Override +public void handleException(Throwable e) { +if (e instanceof RejectedExecutionException) { +log.info("Not processing {} because the event queue is closed.", +this.toString()); +} else { +log.error("Unexpected error handling {}", this.toString(), e); +} +} + +@Override +public String toString() { +return this.getClass().getSimpleName(); +} +} + +class NoDeadlineFunction implements Function { +public static final NoDeadlineFunction INSTANCE = new NoDeadlineFunction(); + +@Override +public OptionalLong apply(OptionalLong ignored) { +return OptionalLong.empty(); +} +} + +class DeadlineFunction implements Function { +private final long deadlineNs; + +public DeadlineFunction(long deadlineNs) { +this.deadlineNs = deadlineNs; +} + +@Override +public OptionalLong apply(OptionalLong ignored) { +return OptionalLong.of(deadlineNs); +} +} + +class EarliestDeadlineFunction implements Function { +private final long newDeadlineNs; + +public EarliestDeadlineFunction(long newDeadlineNs) { +this.newDeadlineNs = newDeadlineNs; +} + +@Override +public OptionalLong apply(OptionalLong prevDeadlineNs) { +if (!prevDeadlineNs.isPresent()) { +return OptionalLong.of(newDeadlineNs); +} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) { +return prevDeadlineNs; +} else { +return OptionalLong.of(newDeadlineNs); +} +} +} + +class VoidEvent implements Event { +public final static VoidEvent INSTANCE = new VoidEvent(); + +@Override +public void run() throws Exception { +} +} + +/** + * Add an element to the front of the queue. + * + * @param event The mandatory event to prepend. + */ +default void prepend(Event event) { +enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, event); +} + +/** + * Add an element to the end of the queue. + * + * @param event The event to append. + */ +default void append(Event event) { +enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, event); +} + +/** + * Add an event to the end of the queue. + * + * @param deadlineNsThe deadline for starting the event, in monotonic + *
[GitHub] [kafka] cmccabe commented on pull request #10047: MINOR: Add ClusterTool as specified in KIP-631
cmccabe commented on pull request #10047: URL: https://github.com/apache/kafka/pull/10047#issuecomment-773516371 Test failures are not related to the PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { Review comment: Hmm, I think this was an artifact of the merge, I'll restore this test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
cmccabe commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570450458 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { +/** + * A context object that wraps events. + */ +private static class EventContext { +/** + * The caller-supplied event. + */ +private final Event event; + +/** + * How this event was inserted. + */ +private final EventInsertionType insertionType; + +/** + * The previous pointer of our circular doubly-linked list. + */ +private EventContext prev = this; + +/** + * The next pointer in our circular doubly-linked list. + */ +private EventContext next = this; + +/** + * If this event is in the delay map, this is the key it is there under. + * If it is not in the map, this is null. + */ +private Long deadlineNs = null; + +/** + * The tag associated with this event. + */ +private String tag; + +EventContext(Event event, EventInsertionType insertionType, String tag) { +this.event = event; +this.insertionType = insertionType; +this.tag = tag; +} + +/** + * Insert a new node in the circularly linked list after this node. + */ +void insertAfter(EventContext other) { +this.next.prev = other; +other.next = this.next; +other.prev = this; +this.next = other; +} + +/** + * Insert a new node in the circularly linked list before this node. + */ +void insertBefore(EventContext other) { +this.prev.next = other; +other.prev = this.prev; +other.next = this; +this.prev = other; +} + +/** + * Remove this node from the circularly linked list. + */ +void remove() { +this.prev.next = this.next; +this.next.prev = this.prev; +this.prev = this; +this.next = this; +} + +/** + * Returns true if this node is the only element in its list. + */ +boolean isSingleton() { +return prev == this && next == this; +} + +/** + * Run the event associated with this EventContext. + */ +void run() throws InterruptedException { +try { +event.run(); +} catch (InterruptedException e) { +throw e; +} catch (Exception e) { +event.handleException(e); +} +} + +/** + * Complete the event associated with this EventContext with a timeout exception. + */ +void completeWithTimeout() { +completeWithException(new TimeoutException()); +} + +/** + * Complete the event associated with this EventContext with the specified + * exception. + */ +void completeWithException(Throwable t) { +event.handleException(t); +} +} + +private class EventHandler implements Runnable { +/** + * Event contexts indexed by tag. Events without a tag are not included here. + */ +private final Map tagToEventContext = new HashMap<>(); + +/** + * The head of the event queue. +
[GitHub] [kafka] cmccabe commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
cmccabe commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570442555 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { Review comment: That's an interesting idea, but I'm not sure I see an advantage for this use-case. We only want a single thread here-- otherwise we would have to have locking in the controller and in the parts of the broker which use this queue. So the potential benefit that I can see from your proposal (allowing multiple executors) doesn't really apply here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
[ https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-7271. -- Resolution: Duplicate > Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers > --- > > Key: KAFKA-7271 > URL: https://issues.apache.org/jira/browse/KAFKA-7271 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: John Roesler >Priority: Blocker > Fix For: 3.0.0 > > > Fix in the oldest branch that ignores the test and cherry-pick forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)