Re: [DISCISS] KIP-860: Add client-provided option to guard against unintentional replication factor change during partition reassignments
out, those, is what the > > behavior is when the server does not support this new option. The > simplest > > thing to do would be for the client to throw UnsupportedVersionException > > with an exception message indicating what the problem is. Then the caller > > could catch this and re-try the call without the flag (or give up, as > > appropriate?) > > > > The other option is to continue on but not actually protect replication > > factor. If we do this, at minimum we'd need to rename the flag something > > like "try to protect replication factor" to make it clear that it's > > best-effort. > > > > It's sort of debatable which way is better. In principle the UVE sounds > > nicer, but in practice maybe the other behavior is best? I suspect most > > systems would turn around and retry without the flag in the event of a > > UVE... > > > > best, > > Colin > > > > > > On Thu, Aug 4, 2022, at 13:37, Vikas Singh wrote: > > > Thanks Stanislav for the KIP. Seems like a reasonable proposal, > > > preventing users from accidentally altering the replica set under > certain > > > conditions. I have couple of comments: > > > > > > > > >> In the case of an already-reassigning partition being reassigned > again, > > > the validation compares the targetReplicaSet size of the reassignment > to > > > the targetReplicaSet size of the new reassignment and throws if those > > > differ. > > > Can you add more detail to this, or clarify what is targetReplicaSet > (for > > > e.g. why not sourceReplicaSet?) and how the target replica set will be > > > calculated? > > > > > > And what about the reassign partitions CLI? Do we want to expose the > > option > > > there too? > > > > > > Cheers, > > > Vikas > > > > > > On Thu, Jul 28, 2022 at 1:59 AM Stanislav Kozlovski < > > stanis...@confluent.io> > > > wrote: > > > > > >> Hey all, > > >> > > >> I'd like to start a discussion on a proposal to help API users from > > >> inadvertently increasing the replication factor of a topic through > > >> the alter partition reassignments API. The KIP describes two fairly > > >> easy-to-hit race conditions in which this can happen. > > >> > > >> The KIP itself is pretty simple, yet has a couple of alternatives that > > can > > >> help solve the same problem. I would appreciate thoughts from the > > community > > >> on how you think we should proceed, and whether the proposal makes > > sense in > > >> the first place. > > >> > > >> Thanks! > > >> > > >> KIP: > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-860%3A+Add+client-provided+option+to+guard+against+replication+factor+change+during+partition+reassignments > > >> JIRA: https://issues.apache.org/jira/browse/KAFKA-14121 > > >> > > >> -- > > >> Best, > > >> Stanislav > > >> > > > > > -- > Best, > Stanislav >
Re: [DISCISS] KIP-860: Add client-provided option to guard against unintentional replication factor change during partition reassignments
Thanks Stanislav for the KIP. Seems like a reasonable proposal, preventing users from accidentally altering the replica set under certain conditions. I have couple of comments: > In the case of an already-reassigning partition being reassigned again, the validation compares the targetReplicaSet size of the reassignment to the targetReplicaSet size of the new reassignment and throws if those differ. Can you add more detail to this, or clarify what is targetReplicaSet (for e.g. why not sourceReplicaSet?) and how the target replica set will be calculated? And what about the reassign partitions CLI? Do we want to expose the option there too? Cheers, Vikas On Thu, Jul 28, 2022 at 1:59 AM Stanislav Kozlovski wrote: > Hey all, > > I'd like to start a discussion on a proposal to help API users from > inadvertently increasing the replication factor of a topic through > the alter partition reassignments API. The KIP describes two fairly > easy-to-hit race conditions in which this can happen. > > The KIP itself is pretty simple, yet has a couple of alternatives that can > help solve the same problem. I would appreciate thoughts from the community > on how you think we should proceed, and whether the proposal makes sense in > the first place. > > Thanks! > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-860%3A+Add+client-provided+option+to+guard+against+replication+factor+change+during+partition+reassignments > JIRA: https://issues.apache.org/jira/browse/KAFKA-14121 > > -- > Best, > Stanislav >
Re: [DISCUSS] KIP-660: Pluggable ReplicaPlacer
Hi Mickael, It's a nice proposal. It's appealing to have a pluggable way to override default kafka placement decisions, and the motivation section lists some of them. Here are few comments: * The motivation section has "When adding brokers to a cluster, Kafka currently does not necessarily place new partitions on new brokers". I am not sure how valuable doing this will be. A newly created kafka topic takes time to reach the same usage level as existing topics, say because the topic created by a new workload that is getting onboarded, or the expansion was done to relieve disk pressure on existing nodes etc. While new topics catch up to existing workload, the new brokers are not sharing equal load in the cluster, which probably defeats the purpose of adding new brokers. In addition to that clustering new topics like this on new brokers have implications from fault domain perspective. A reasonable way to approach it is to indeed use CruiseControl to move things around so that the newly added nodes become immediately involved and share cluster load. * Regarding "When administrators want to remove brokers from a cluster, there is no way to prevent Kafka from placing partitions on them", this is indeed an issue. I would argue that this is needed by everyone and should be part of Kafka, instead of being implemented as part of a plugin interface by multiple teams. * For "When some brokers are near their storage/throughput limit, Kafka could avoid putting new partitions on them", while this can help relieve short term overload I think again the correct solution here is something like CruiseControl where the system is monitored and things moved around to maintain a balanced cluster. A new topic will not take any disk space, so placing them anywhere normally isn't going to add to the storage overload. Similar to the previous case, maybe a mechanism in Kafka to put nodes in a quarantine state is a better way to approach this. In terms of the proposed api, I have a couple of comments: * It is not clear if the proposal applies to partitions of new topics or addition on partitions to an existing topic. Explicitly stating that will be helpful. * Regarding part "To address the use cases identified in the motivation section, some knowledge about the current state of the cluster is necessary. Details whether a new broker has just been added or is being decommissioned are not part of the cluster metadata. Therefore such knowledge has to be provided via an external means to the ReplicaPlacer, for example via the configuration". It's not clear how this will be done. If I have to implement this interface, it will be helpful to have clear guidance/examples here which hopefully ties to the use cases in the motivation section. It also allows us to figure out if the proposed interface is complete and helps future implementers of the interface. Couple of minor comments: * The KIP is not listed in the main KIP page ( https://cwiki-test.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals). Can you please add it there. * The page has "This is especially true for the 4 scenarios listed in the Motivation section", but there are only 3 scenarios listed. Regards, Vikas On Tue, May 3, 2022 at 5:51 PM Colin McCabe wrote: > Hi Mickael, > > We did discuss this earlier, and I remember not being too enthusiastic > about a pluggable policy here :) > > There have been several changes to the placement code in the last few > weeks. (These are examples of the kind of changes that are impossible to do > once an API is established, by the way.) Can you please revise the KIP to > take these into account? > > I'd also like to understand a little bit better why we need this API when > we have the explicit placement API for createTopics and createPartitions. > Can you give me a few scenarios where the manual placement API would be > insufficient? > > best, > Colin > > > On Mon, May 2, 2022, at 09:28, Mickael Maison wrote: > > Hi, > > > > If there are no further comments, I'll start a vote in the next few days. > > > > Thanks, > > Mickael > > > > On Wed, Mar 30, 2022 at 3:51 AM Luke Chen wrote: > >> > >> Hi Mickael, > >> > >> Thanks for the update. > >> It answered my questions! > >> > >> Thank you. > >> Luke > >> > >> On Wed, Mar 30, 2022 at 12:09 AM Mickael Maison < > mickael.mai...@gmail.com> > >> wrote: > >> > >> > Hi Luke, > >> > > >> > Thanks for the feedback. > >> > > >> > 1. Thanks, fixed! > >> > 2. Yes that's right. It's the same behavior for topic policies > >> > 3. I've added details about how the mentioned scenarios could be > >> > addressed. The information required to make such decisions is not part > >> > of the Kafka cluster metadata so an external input is necessary. This > >> > KIP does not propose a specific mechanism for doing it. > >> > > >> > I hope this answers your questions. > >> > > >> > Thanks, > >> > Mickael > >> > > >> > > >> > On Tue, Mar 29, 2022 at 5:42 PM Mickael Maison < > mickael.mai...@gmail.com> >
permission to create KIP
Hi, Sending this mail as per https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals to request permission to be able to create KIP and contribute to AK. confluence wiki id: vikasconfluent jira id: vikasconfluent Thanks, Vikas
[jira] [Created] (KAFKA-13517) Add ConfigurationKeys to ConfigResource class
Vikas Singh created KAFKA-13517: --- Summary: Add ConfigurationKeys to ConfigResource class Key: KAFKA-13517 URL: https://issues.apache.org/jira/browse/KAFKA-13517 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 3.0.0, 2.8.1 Reporter: Vikas Singh Assignee: Vikas Singh Fix For: 2.8.1 A list of {{ConfigResource}} class is passed as argument to {{AdminClient::describeConfigs}} api to indicate configuration of the entities to fetch. The {{ConfigResource}} class is made up of two fields, name and type of entity. Kafka returns *all* configurations for the entities provided to the admin client api. This admin api in turn uses {{DescribeConfigsRequest}} kafka api to get the configuration for the entities in question. In addition to name and type of entity whose configuration to get, Kafka {{DescribeConfigsResource}} structure also lets users provide {{ConfigurationKeys}} list, which allows users to fetch only the configurations that are needed. However, this field isn't exposed in the {{ConfigResource}} class that is used by AdminClient, so users of AdminClient have no way to ask for specific configuration. The API always returns *all* configurations. Then the user of the {{AdminClient::describeConfigs}} go over the returned list and filter out the config keys that they are interested in. This results in boilerplate code for all users of {{AdminClient::describeConfigs}} api, in addition to being wasteful use of resource. It becomes painful in large cluster case where to fetch one configuration of all topics, we need to fetch all configuration of all topics, which can be huge in size. Creating this Jira to add same field (i.e. {{{}ConfigurationKeys{}}}) to the {{ConfigResource}} structure to bring it to parity to {{DescribeConfigsResource}} Kafka API structure. There should be no backward compatibility issue as the field will be optional and will behave same way if it is not specified (i.e. by passing null to backend kafka api) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13432) ApiException should provide a way to capture stacktrace
Vikas Singh created KAFKA-13432: --- Summary: ApiException should provide a way to capture stacktrace Key: KAFKA-13432 URL: https://issues.apache.org/jira/browse/KAFKA-13432 Project: Kafka Issue Type: Improvement Components: core Reporter: Vikas Singh ApiException doesn't fill in the stacktrace, it overrides `fillInStacktrace` to make it a no-op, here is the code: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java#L45,L49 However, there are times when full stacktrace will be helpful in finding out what went wrong on the client side. We should provide a way to make this behavior configurable, so that if an error is hit multiple times, we can switch the behavior and find out what code is causing it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10531) KafkaBasedLog can sleep for negative values
Vikas Singh created KAFKA-10531: --- Summary: KafkaBasedLog can sleep for negative values Key: KAFKA-10531 URL: https://issues.apache.org/jira/browse/KAFKA-10531 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.6.0 Reporter: Vikas Singh Fix For: 2.6.1 {{time.milliseconds}} is not monotonic, so this code can throw : {{java.lang.IllegalArgumentException: timeout value is negative}} {code:java} long started = time.milliseconds(); while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) { partitionInfos = consumer.partitionsFor(topic); Utils.sleep(Math.min(time.milliseconds() - started, 1000)); } {code} We need to check for negative value before sleeping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10175) MetadataCache::getCluster returns null for offline replicas
Vikas Singh created KAFKA-10175: --- Summary: MetadataCache::getCluster returns null for offline replicas Key: KAFKA-10175 URL: https://issues.apache.org/jira/browse/KAFKA-10175 Project: Kafka Issue Type: Bug Reporter: Vikas Singh This line in the code always returns null: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/MetadataCache.scala#L272 The reason is that the `map(node)` part uses `aliveNodes` to create `Node` object otherwise default to `null`. Offline replicas thus end up always as null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9370) Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress
Vikas Singh created KAFKA-9370: -- Summary: Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress Key: KAFKA-9370 URL: https://issues.apache.org/jira/browse/KAFKA-9370 Project: Kafka Issue Type: Bug Reporter: Vikas Singh `KafkaApis::handleCreatePartitionsRequest` returns `INVALID_TOPIC_EXCEPTION` if the topic is getting deleted. Change it to return `UNKNOWN_TOPIC_OR_PARTITION` instead. After the delete topic api returns, client should see the topic as deleted. The fact that we are processing deletion in background shouldn't have any impact. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9330) Calling AdminClient.close in the AdminClient completion callback causes deadlock
Vikas Singh created KAFKA-9330: -- Summary: Calling AdminClient.close in the AdminClient completion callback causes deadlock Key: KAFKA-9330 URL: https://issues.apache.org/jira/browse/KAFKA-9330 Project: Kafka Issue Type: Bug Reporter: Vikas Singh The close method calls `Thread.join` to wait for AdminClient thread to die, but that doesn't happen as the thread calling join is the AdminClient thread. This causes the thread to block forever, causing a deadlock where it forever waits for itself to die. `AdminClient.close` should check if the thread calling close is same as current thread, then skip the join. The thread will check for close condition in the main loop and exit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9329) KafkaController::replicasAreValid should return error
Vikas Singh created KAFKA-9329: -- Summary: KafkaController::replicasAreValid should return error Key: KAFKA-9329 URL: https://issues.apache.org/jira/browse/KAFKA-9329 Project: Kafka Issue Type: Bug Reporter: Vikas Singh The method currently returns a boolean indicating if replicas are valid or not. But the failure condition loses any context on why replicas are not valid. We should return the error condition along with success/failure. Maybe change method name to something like `validateReplicas` too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9265) kafka.log.Log instances are leaking on log delete
Vikas Singh created KAFKA-9265: -- Summary: kafka.log.Log instances are leaking on log delete Key: KAFKA-9265 URL: https://issues.apache.org/jira/browse/KAFKA-9265 Project: Kafka Issue Type: Bug Reporter: Vikas Singh KAFKA-8448 fixes problem with similar leak. The {{Log}} objects are being held in {{ScheduledExecutor}} {{PeriodicProducerExpirationCheck}} callback. The fix in KAFKA-8448 was to change the policy of {{ScheduledExecutor}} to remove the scheduled task when it gets canceled (by calling {{setRemoveOnCancelPolicy(true)}}). This works when a log is closed using {{close()}} method. But when a log is deleted either when the topic gets deleted or when the rebalancing operation moves the replica away from broker, the {{delete()}} operation is invoked. {{Log.delete()}} doesn't close the pending scheduled task and that leaks Log instance. Fix is to close the scheduled task in the {{Log.delete()}} method too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8988) Replace CreatePartitions request/response with automated protocol
Vikas Singh created KAFKA-8988: -- Summary: Replace CreatePartitions request/response with automated protocol Key: KAFKA-8988 URL: https://issues.apache.org/jira/browse/KAFKA-8988 Project: Kafka Issue Type: Sub-task Reporter: Vikas Singh Assignee: Vikas Singh -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8457) Remove Log dependency from Replica
[ https://issues.apache.org/jira/browse/KAFKA-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh resolved KAFKA-8457. Resolution: Fixed Fixed in commit 57baa4079d9fc14103411f790b9a025c9f2146a4 > Remove Log dependency from Replica > -- > > Key: KAFKA-8457 > URL: https://issues.apache.org/jira/browse/KAFKA-8457 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Vikas Singh > Assignee: Vikas Singh >Priority: Major > > A partition can have one log but many replicas. Putting log in replica meant > that we have to have if-else each time we need to access log. Moving the log > out of replica and in partition will make code simpler and it will also help > in testing where mocks will get simplified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8525) Make log in Partion non-optional
Vikas Singh created KAFKA-8525: -- Summary: Make log in Partion non-optional Key: KAFKA-8525 URL: https://issues.apache.org/jira/browse/KAFKA-8525 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.3.0 Reporter: Vikas Singh While moving log out of replica to partition as part of KAFKA-8457 cleaned a bunch of code by removing code like "if (!localReplica) throw), there are still couple of additional cleanups that can be done: # The log object in Partition can be made non-optional. As it doesn't make sense to have a partition w/o log. Here is comment on PR for KAFKA-8457: {{I think it shouldn't be possible to have a Partition without a corresponding Log. Once this is merged, I think we can look into whether we can replace the optional log field in this class with a concrete instance.}} # The LocalReplica class can be removed simplifying replica class. Here is another comment on the PR: {{it might be possible to turn Replica into a trait and then let Log implement it directly. Then we could get rid of LocalReplica. That would also help us clean up RemoteReplica, since the usage of LogOffsetMetadata only makes sense for the local replica.}} Creating this JIRA to track these refactoring tasks for future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8457) Remove Log dependency from Replica
Vikas Singh created KAFKA-8457: -- Summary: Remove Log dependency from Replica Key: KAFKA-8457 URL: https://issues.apache.org/jira/browse/KAFKA-8457 Project: Kafka Issue Type: Bug Components: core Reporter: Vikas Singh Assignee: Vikas Singh A partition can have one log but many replicas. Putting log in replica meant that we have to have if-else each time we need to access log. Moving the log out of replica and in partition will make code simpler and it will also help in testing where mocks will get simplified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error
[ https://issues.apache.org/jira/browse/KAFKA-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh resolved KAFKA-8341. Resolution: Fixed fixed in commit 46a02f3231cd6d340c622636159b9f59b4b3cb6e > AdminClient should retry coordinator lookup after NOT_COORDINATOR error > --- > > Key: KAFKA-8341 > URL: https://issues.apache.org/jira/browse/KAFKA-8341 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson > Assignee: Vikas Singh >Priority: Major > > If a group operation (e.g. DescribeGroup) fails because the coordinator has > moved, the AdminClient should lookup the coordinator before retrying the > operation. Currently we will either fail or just retry anyway. This is > similar in some ways to controller rediscovery after getting NOT_CONTROLLER > errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8398) NPE when unmapping files after moving log directories using AlterReplicaLogDirs
Vikas Singh created KAFKA-8398: -- Summary: NPE when unmapping files after moving log directories using AlterReplicaLogDirs Key: KAFKA-8398 URL: https://issues.apache.org/jira/browse/KAFKA-8398 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.2.0 Reporter: Vikas Singh Attachments: AlterReplicaLogDirs.txt The NPE occurs after the AlterReplicaLogDirs command completes successfully and when unmapping older regions. The relevant part of log is in attached log file. Here is the stacktrace (which is repeated for both index files): {code:java} [2019-05-20 14:08:13,999] ERROR Error unmapping index /tmp/kafka-logs/test-0.567a0d8ff88b45ab95794020d0b2e66f-delete/.index (kafka.log.OffsetIndex) java.lang.NullPointerException at org.apache.kafka.common.utils.MappedByteBuffers.unmap(MappedByteBuffers.java:73) at kafka.log.AbstractIndex.forceUnmap(AbstractIndex.scala:318) at kafka.log.AbstractIndex.safeForceUnmap(AbstractIndex.scala:308) at kafka.log.AbstractIndex.$anonfun$closeHandler$1(AbstractIndex.scala:257) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.AbstractIndex.closeHandler(AbstractIndex.scala:257) at kafka.log.AbstractIndex.deleteIfExists(AbstractIndex.scala:226) at kafka.log.LogSegment.$anonfun$deleteIfExists$6(LogSegment.scala:597) at kafka.log.LogSegment.delete$1(LogSegment.scala:585) at kafka.log.LogSegment.$anonfun$deleteIfExists$5(LogSegment.scala:597) at kafka.utils.CoreUtils$.$anonfun$tryAll$1(CoreUtils.scala:115) at kafka.utils.CoreUtils$.$anonfun$tryAll$1$adapted(CoreUtils.scala:114) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.utils.CoreUtils$.tryAll(CoreUtils.scala:114) at kafka.log.LogSegment.deleteIfExists(LogSegment.scala:599) at kafka.log.Log.$anonfun$delete$3(Log.scala:1762) at kafka.log.Log.$anonfun$delete$3$adapted(Log.scala:1762) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Log.$anonfun$delete$2(Log.scala:1762) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.log.Log.maybeHandleIOException(Log.scala:2013) at kafka.log.Log.delete(Log.scala:1759) at kafka.log.LogManager.deleteLogs(LogManager.scala:761) at kafka.log.LogManager.$anonfun$deleteLogs$6(LogManager.scala:775) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)