[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357817#comment-16357817 ] Gwen Shapira edited comment on KAFKA-2967 at 2/9/18 1:45 AM: - Having used both, I have to say that I like Asciidoc's version of markdown is much easier to use. The available plugins (especially for Atom) are better too. [~vikgamov] is a Gradle expert (or at least an official reviewer of O'Reilly Gradle book), and given our Gradle/doc integration pains in the past, it is good to have a community volunteer with the necessary expertise to maintain the integration. How does everyone else feel on Asciidoc vs RST? (Sorry for bikeshedding, [~ewencp], but I think it is worthwhile to think a bit about a decision we'll have to live with for the coming years). was (Author: gwenshap): Having used both, I have to say that I like Asciidoc's version of markdown is much easier to use. The available plugins (especially for Atom) are better too. [~vikgamov] is a Gradle expert (or at least an official reviewer of O'Reilly Gradle book), and given our Gradle/doc integration pains in the past, it is good to have a community volunteer with the necessary expertise to maintain the integration. How does everyone else feel on Asciidoc vs RST? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357817#comment-16357817 ] Gwen Shapira commented on KAFKA-2967: - Having used both, I have to say that I like Asciidoc's version of markdown is much easier to use. The available plugins (especially for Atom) are better too. [~vikgamov] is a Gradle expert (or at least an official reviewer of O'Reilly Gradle book), and given our Gradle/doc integration pains in the past, it is good to have a community volunteer with the necessary expertise to maintain the integration. How does everyone else feel on Asciidoc vs RST? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6547) group offset reset and begin_offset ignored/no effect
[ https://issues.apache.org/jira/browse/KAFKA-6547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357807#comment-16357807 ] Jeff Widman commented on KAFKA-6547: What is the specific command you are running? How do you know there is no effect? Simply because the consumer is later unable to poll? How do you know it's not a broken consumer? > group offset reset and begin_offset ignored/no effect > - > > Key: KAFKA-6547 > URL: https://issues.apache.org/jira/browse/KAFKA-6547 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: ubuntu 16, java 1.8 >Reporter: Dan >Priority: Major > Fix For: 0.11.0.2 > > > Use of kafka-consumer-group.sh with --reset-offsets --execute <--to-earliest > or anything> has no effect in 1.0. When my group client connects and requests > a specific offset or an earliest there's no effect and the consumer is unable > to poll, so no messages, even new ones are ignored. > I installed 0.11 and these problems are not manifest. > I'm unfamiliar with the internals and put the offset manager as the possible > component, but that's a guess. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357798#comment-16357798 ] Vik Gamov edited comment on KAFKA-2967 at 2/9/18 1:27 AM: -- Guys, Have we ever consider to use asciidoc [http://asciidoctor.org/docs/what-is-asciidoc/] for writing documentation rather rst or markdown? There are bunch of known projects who use asciidoc [http://asciidoctor.org/docs/what-is-asciidoc/#who-s-using-asciidoc] (like GIT documentation and Neo4j database docs, github supports it) It's very powerful and suitable for writing complex text (like books) Oreilly uses it for book writers, there are plugins for gradle to generate html5, pdf, and even mobile optimized formats like epub and mobi. It has wide range integrations (including gradle and maven) [http://asciidoctor.org/docs/#references-and-developer-resources] Here is how it different than markdown (for example) [http://asciidoctor.org/docs/user-manual/#compared-to-markdown]. Thank you was (Author: vikgamov): Guys, Have we ever consider to use asciidoc [http://asciidoctor.org/docs/what-is-asciidoc/] for writing documentation rather rst or markdown? It's very powerful and suitable for writing complex text (like books) Oreilly uses it for book writers, there are plugins for gradle to generate html5, pdf, and even mobile optimized formats like epub and mobi. It has wide range integrations (including gradle and maven) [http://asciidoctor.org/docs/#references-and-developer-resources] Here is how it different than markdown (for example) http://asciidoctor.org/docs/user-manual/#compared-to-markdown. Thank you > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357798#comment-16357798 ] Vik Gamov commented on KAFKA-2967: -- Guys, Have we ever consider to use asciidoc [http://asciidoctor.org/docs/what-is-asciidoc/] for writing documentation rather rst or markdown? It's very powerful and suitable for writing complex text (like books) Oreilly uses it for book writers, there are plugins for gradle to generate html5, pdf, and even mobile optimized formats like epub and mobi. It has wide range integrations (including gradle and maven) [http://asciidoctor.org/docs/#references-and-developer-resources] Here is how it different than markdown (for example) http://asciidoctor.org/docs/user-manual/#compared-to-markdown. Thank you > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize
[ https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6501. Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 > Add test to verify markPartitionsForTruncation after fetcher thread pool > resize > > > Key: KAFKA-6501 > URL: https://issues.apache.org/jira/browse/KAFKA-6501 > Project: Kafka > Issue Type: Sub-task >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > Follow-on task from KAFKA-6242 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize
[ https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357749#comment-16357749 ] ASF GitHub Bot commented on KAFKA-6501: --- hachikuji closed pull request #4539: KAFKA-6501: Dynamic broker config tests updates and metrics fix URL: https://github.com/apache/kafka/pull/4539 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 144632c6c5d..8a17528bfb7 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -24,7 +24,6 @@ import java.util.concurrent._ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Gauge, Meter} import kafka.metrics.KafkaMetricsGroup -import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction} import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.Send @@ -40,6 +39,10 @@ import scala.reflect.ClassTag object RequestChannel extends Logging { private val requestLogger = Logger("kafka.request.logger") + val RequestQueueSizeMetric = "RequestQueueSize" + val ResponseQueueSizeMetric = "ResponseQueueSize" + val ProcessorMetricTag = "processor" + def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled sealed trait BaseRequest @@ -241,15 +244,16 @@ object RequestChannel extends Logging { } class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { + import RequestChannel._ val metrics = new RequestChannel.Metrics private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() - newGauge("RequestQueueSize", new Gauge[Int] { + newGauge(RequestQueueSizeMetric, new Gauge[Int] { def value = requestQueue.size }) - newGauge("ResponseQueueSize", new Gauge[Int]{ + newGauge(ResponseQueueSizeMetric, new Gauge[Int]{ def value = processors.values.asScala.foldLeft(0) {(total, processor) => total + processor.responseQueueSize } @@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { def addProcessor(processor: Processor): Unit = { if (processors.putIfAbsent(processor.id, processor) != null) warn(s"Unexpected processor with processorId ${processor.id}") + +newGauge(ResponseQueueSizeMetric, + new Gauge[Int] { +def value = processor.responseQueueSize + }, + Map(ProcessorMetricTag -> processor.id.toString) +) } def removeProcessor(processorId: Int): Unit = { processors.remove(processorId) +removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString)) } /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index fef412b7198..d37b5231594 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } +private[kafka] object Processor { + val IdlePercentMetricName = "IdlePercent" + val NetworkProcessorMetricTag = "networkProcessor" + val ListenerMetricTag = "listener" +} + /** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selector @@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int, memoryPool: MemoryPool, logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + import Processor._ private object ConnectionId { def fromString(s: String): Option[ConnectionId] = s.split("-") match { case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) => @@ -471,18 +478,11 @@ private[kafka] class Processor(val id: Int, private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]() private[kafka] val metricTags = mutable.LinkedHashMap( -"listener" -> listenerName.value, -"networkProcessor" -> id.toString +ListenerMetricTag -> listenerName.value, +NetworkProcessorMetricTag -> id.toString ).asJava - newGauge("ResponseQueueSize", -new Gauge[Int] { - def value = responseQueue.size() -}, -
[jira] [Commented] (KAFKA-6208) Reduce startup time for Kafka Connect workers
[ https://issues.apache.org/jira/browse/KAFKA-6208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357745#comment-16357745 ] Satyajit varma commented on KAFKA-6208: --- [~rhauch] , we could make the connectors,SMT's and converters load asynchronously, like below, {code} CompletableFuture.runAsync(()->{ addPlugins(plugins.connectors(), loader); connectors.addAll(plugins.connectors()); }, Executors.newSingleThreadExecutor()); CompletableFuture.runAsync(()->{ addPlugins(plugins.converters(), loader); converters.addAll(plugins.converters()); }, Executors.newSingleThreadExecutor()); CompletableFuture.runAsync(()->{ addPlugins(plugins.transformations(), loader); transformations.addAll(plugins.transformations()); }, Executors.newSingleThreadExecutor()); {code} > Reduce startup time for Kafka Connect workers > - > > Key: KAFKA-6208 > URL: https://issues.apache.org/jira/browse/KAFKA-6208 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Major > > Kafka Connect startup times are excessive with a handful of connectors on the > plugin path or classpath. We should not be scanning three times (once for > connectors, once for SMTs, and once for converters), and hopefully we can > avoid scanning directories that are clearly not plugin directories. > We should also consider using Java's Service Loader to quickly identify > connectors. The latter would require a KIP and would require time to for > connectors to migrate, but we could be smarter about only scanning plugin > directories that need to be scanned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6388) Error while trying to roll a segment that already exists
[ https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357331#comment-16357331 ] Paul Davidson edited comment on KAFKA-6388 at 2/9/18 12:03 AM: --- [~hachikuji] We saw this on non-compacted topics. was (Author: pdavidson): [~dhay] We saw this on non-compacted topics. > Error while trying to roll a segment that already exists > > > Key: KAFKA-6388 > URL: https://issues.apache.org/jira/browse/KAFKA-6388 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: David Hay >Priority: Blocker > > Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in > our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2). > After spending 30 min or more spewing log messages like this: > {noformat} > [2017-12-19 16:44:28,998] INFO Replica loaded for partition > screening.save.results.screening.save.results.processor.error-43 with initial > high watermark 0 (kafka.cluster.Replica) > {noformat} > Eventually, the replica thread throws the error below (also referenced in the > original issue). If I remove that partition from the data directory and > bounce the broker, it eventually rebalances (assuming it doesn't hit a > different partition with the same error). > {noformat} > 2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.log already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.index already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.timeindex already exists; deleting it first > (kafka.log.Log) > [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions __consumer_offsets-20 > (kafka.server.ReplicaFetcherManager) > [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > sr.new.sr.new.processor.error-38 offset 2 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Caused by: kafka.common.KafkaException: Trying to roll a new log segment for > topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it > already exists. > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338) > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.roll(Log.scala:1297) > at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.append(Log.scala:624) > at kafka.log.Log.appendAsFollower(Log.scala:607) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41) > at >
[jira] [Created] (KAFKA-6548) Migrate committed offsets from ZooKeeper to Kafka
Manikandan P created KAFKA-6548: --- Summary: Migrate committed offsets from ZooKeeper to Kafka Key: KAFKA-6548 URL: https://issues.apache.org/jira/browse/KAFKA-6548 Project: Kafka Issue Type: Improvement Components: offset manager Affects Versions: 0.10.0.0 Environment: Windows Reporter: Manikandan P We were using previous version of Kafka(0.8.X) where all the offset details were stored in ZooKeeper. Now we moved to new version of Kafka(0.10.X) where all the Topic offset details are stored in Kafka itself. We have to move all the Topic offset details to ZooKeeper to Kafka for existing application in Production. Kafka is installed in Windows machine. we can't run kafka-consumer-groups.sh from windows. Please advice how to migrate committed offsets from ZooKeeper to Kafka. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6547) group offset reset and begin_offset ignored/no effect
Dan created KAFKA-6547: -- Summary: group offset reset and begin_offset ignored/no effect Key: KAFKA-6547 URL: https://issues.apache.org/jira/browse/KAFKA-6547 Project: Kafka Issue Type: Bug Components: offset manager Affects Versions: 1.0.0 Environment: ubuntu 16, java 1.8 Reporter: Dan Fix For: 0.11.0.2 Use of kafka-consumer-group.sh with --reset-offsets --execute <--to-earliest or anything> has no effect in 1.0. When my group client connects and requests a specific offset or an earliest there's no effect and the consumer is unable to poll, so no messages, even new ones are ignored. I installed 0.11 and these problems are not manifest. I'm unfamiliar with the internals and put the offset manager as the possible component, but that's a guess. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
Rajini Sivaram created KAFKA-6546: - Summary: Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener Key: KAFKA-6546 URL: https://issues.apache.org/jira/browse/KAFKA-6546 Project: Kafka Issue Type: Improvement Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.2.0 In 1,1, if an endpoint is available on the broker processing a metadata request, but the corresponding listener is not available on the leader of a partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some brokers are not configured with all listeners or it could indicate a transient error when listeners are dynamically added, We want to treat the error as a transient error to process dynamic updates, but we should notify clients of the actual error. This change should be made when MetadataRequest version is updated so that LEADER_NOT_AVAILABLE is returned to older clients. See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] and [https://github.com/apache/kafka/pull/4539] for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357626#comment-16357626 ] Ismael Juma commented on KAFKA-6529: [~damianguy] Yes, I think the PR should be merged today or tomorrow. [~rsivaram] and [~hachikuji] are on it. > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > Fix For: 1.1.0, 0.11.0.3, 1.0.2 > > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357615#comment-16357615 ] Chong Wang commented on KAFKA-6545: --- [~ijuma] Thanks, I was thinking it's AdminClient.scala. > AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0 > - > > Key: KAFKA-6545 > URL: https://issues.apache.org/jira/browse/KAFKA-6545 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 1.0.0 >Reporter: Chong Wang >Priority: Major > > Not sure why this function is deleted > ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)] > Our code was relying on it. > {quote}final MetadataResponse.TopicMetadata topicMetadata = > AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); > final Topic topic = new Topic(); > topic.setPartitions(topicMetadata.partitionMetadata().size()); > final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e > -> e.replicas().size()).sum(); > topic.setReplications(replicas); > {quote} > Is there any alternatives? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chong Wang resolved KAFKA-6545. --- Resolution: Not A Problem > AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0 > - > > Key: KAFKA-6545 > URL: https://issues.apache.org/jira/browse/KAFKA-6545 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 1.0.0 >Reporter: Chong Wang >Priority: Major > > Not sure why this function is deleted > ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)] > Our code was relying on it. > {quote}final MetadataResponse.TopicMetadata topicMetadata = > AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); > final Topic topic = new Topic(); > topic.setPartitions(topicMetadata.partitionMetadata().size()); > final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e > -> e.replicas().size()).sum(); > topic.setReplications(replicas); > {quote} > Is there any alternatives? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6538: --- Affects Version/s: (was: 1.2.0) 1.1.0 > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Minor > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists
[ https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357588#comment-16357588 ] David Hay commented on KAFKA-6388: -- I'm pretty sure we're not using compacted topics. It's whatever is the default out of the box. > Error while trying to roll a segment that already exists > > > Key: KAFKA-6388 > URL: https://issues.apache.org/jira/browse/KAFKA-6388 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: David Hay >Priority: Blocker > > Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in > our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2). > After spending 30 min or more spewing log messages like this: > {noformat} > [2017-12-19 16:44:28,998] INFO Replica loaded for partition > screening.save.results.screening.save.results.processor.error-43 with initial > high watermark 0 (kafka.cluster.Replica) > {noformat} > Eventually, the replica thread throws the error below (also referenced in the > original issue). If I remove that partition from the data directory and > bounce the broker, it eventually rebalances (assuming it doesn't hit a > different partition with the same error). > {noformat} > 2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.log already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.index already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.timeindex already exists; deleting it first > (kafka.log.Log) > [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions __consumer_offsets-20 > (kafka.server.ReplicaFetcherManager) > [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > sr.new.sr.new.processor.error-38 offset 2 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Caused by: kafka.common.KafkaException: Trying to roll a new log segment for > topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it > already exists. > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338) > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.roll(Log.scala:1297) > at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.append(Log.scala:624) > at kafka.log.Log.appendAsFollower(Log.scala:607) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184) > ... 13 more >
[jira] [Commented] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357577#comment-16357577 ] Ismael Juma commented on KAFKA-6545: The commit description states why they were removed (they were internal APIs with no tests). The alternative is to use AdminClient (a public and supported API): https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java > AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0 > - > > Key: KAFKA-6545 > URL: https://issues.apache.org/jira/browse/KAFKA-6545 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 1.0.0 >Reporter: Chong Wang >Priority: Major > > Not sure why this function is deleted > ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)] > Our code was relying on it. > {quote}final MetadataResponse.TopicMetadata topicMetadata = > AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); > final Topic topic = new Topic(); > topic.setPartitions(topicMetadata.partitionMetadata().size()); > final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e > -> e.replicas().size()).sum(); > topic.setReplications(replicas); > {quote} > Is there any alternatives? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chong Wang updated KAFKA-6545: -- Description: Not sure why this function is deleted ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)] Our code was relying on it. {quote}final MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); final Topic topic = new Topic(); topic.setPartitions(topicMetadata.partitionMetadata().size()); final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> e.replicas().size()).sum(); topic.setReplications(replicas); {quote} Is there any alternatives? was: Not sure why this function is deleted, our code was relying on it. final MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); final Topic topic = new Topic(); topic.setPartitions(topicMetadata.partitionMetadata().size()); final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> e.replicas().size()).sum(); topic.setReplications(replicas); > AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0 > - > > Key: KAFKA-6545 > URL: https://issues.apache.org/jira/browse/KAFKA-6545 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 1.0.0 >Reporter: Chong Wang >Priority: Major > > Not sure why this function is deleted > ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)] > Our code was relying on it. > {quote}final MetadataResponse.TopicMetadata topicMetadata = > AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); > final Topic topic = new Topic(); > topic.setPartitions(topicMetadata.partitionMetadata().size()); > final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e > -> e.replicas().size()).sum(); > topic.setReplications(replicas); > {quote} > Is there any alternatives? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
Chong Wang created KAFKA-6545: - Summary: AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0 Key: KAFKA-6545 URL: https://issues.apache.org/jira/browse/KAFKA-6545 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 1.0.0 Reporter: Chong Wang Not sure why this function is deleted, our code was relying on it. final MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); final Topic topic = new Topic(); topic.setPartitions(topicMetadata.partitionMetadata().size()); final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> e.replicas().size()).sum(); topic.setReplications(replicas); -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"
[ https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Yang updated KAFKA-6544: --- Description: Our kafka cluster encountered a few disk/xfs failures in the cloud vm instances. When a disk/xfs failure happens, kafka process did not exit gracefully. Instead, it ran into "" status, with port 9092 still be reachable. when failures like this happens, kafka should shutdown all threads and exit. The following is the kafka logs when the failure happens: {code:java} [2018-02-08 12:52:31,764] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:340) at kafka.network.Acceptor.run(SocketServer.scala:283) at java.lang.Thread.run(Thread.java:748) [2018-02-08 12:52:31,772] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:340) at kafka.network.Acceptor.run(SocketServer.scala:283) at java.lang.Thread.run(Thread.java:748) [2018-02-08 12:52:31,772] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:340) at kafka.network.Acceptor.run(SocketServer.scala:283) at java.lang.Thread.run(Thread.java:748) {code} was: Our kafka cluster encountered a few disk/xfs failures in the cloud vm instances. When a disk/xfs failure happens, kafka process did not exit gracefully. Instead, it run into "" status, with port 9092 still be reachable. when failures like this happens, kafka should shutdown all threads and exit. The following is the kafka logs when the failure happens: {code:java} [2018-02-08 12:52:31,764] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:340) at kafka.network.Acceptor.run(SocketServer.scala:283) at java.lang.Thread.run(Thread.java:748) [2018-02-08 12:52:31,772] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:340) at kafka.network.Acceptor.run(SocketServer.scala:283) at java.lang.Thread.run(Thread.java:748) [2018-02-08 12:52:31,772] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:340) at kafka.network.Acceptor.run(SocketServer.scala:283) at java.lang.Thread.run(Thread.java:748) {code} > kafka process should exit when it encounters "java.io.IOException: Too many > open files" > - > > Key: KAFKA-6544 > URL: https://issues.apache.org/jira/browse/KAFKA-6544 > Project: Kafka > Issue Type: Bug > Components: admin, network >Affects Versions: 0.10.2.1 >Reporter: Yu Yang >Priority: Major > > Our kafka cluster encountered a few disk/xfs failures in the cloud vm > instances. When a disk/xfs failure happens, kafka process did not exit > gracefully. Instead, it ran into "" status, with port 9092 still be >
[jira] [Commented] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"
[ https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357483#comment-16357483 ] Yu Yang commented on KAFKA-6544: [~cmccabe] The kafka process is in `` status. sudo ls -l /proc/$kafka_pid/fd returns 0. I am also including "netstat -pnt" output here. Connections are either in ESTABLISHED or CLOSE_WAIT status. {code} proc/30413/fd]# sudo ls -l /proc/30413/fd total 0 {code} {code} netstat -pnt | grep "10.1.160.124:9092" | wc 116 812 11252 {code} {code} netstat -pnt | grep "10.1.160.124:9092" tcp 29 0 10.1.160.124:9092 10.1.25.241:55616 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:58624 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.9.121:33894CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:53886 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:43122 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:50766 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.26.165:34282 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.79.149:47682 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.163.135:44008 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.66.116:52398 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.64.116:36656 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.207.247:51904 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.9.16:45942 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.131.15:57118 CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:55974 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.214.5:33040CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:33494 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.201.139:60230 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.207.247:51792 CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:42858 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:44246 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.194.26:42406 CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:32902 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.169.94:35532 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.193.101:48832 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.204.225:60946 CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:35772 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:46972 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:56226 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:46432 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:44436 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:4 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:47364 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:44908 ESTABLISHED - tcp 29 0 10.1.160.124:9092 10.1.25.241:43060 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.10.15:39282CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.181.86:55500 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.17.191:32812 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.141.30:52024 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.76.141:51366 CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:50940 ESTABLISHED - tcp 65 0 10.1.160.124:9092 10.1.11.196:44064 CLOSE_WAIT - tcp 65 0 10.1.160.124:9092 10.1.143.107:37116 CLOSE_WAIT - tcp 29 0 10.1.160.124:9092 10.1.25.241:37416 ESTABLISHED - tcp 65 0 10.1.160.124:9092
[jira] [Updated] (KAFKA-6390) Update ZooKeeper to 3.4.11, Gradle and other minor updates
[ https://issues.apache.org/jira/browse/KAFKA-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6390: --- Fix Version/s: (was: 2.0.0) 1.1.0 > Update ZooKeeper to 3.4.11, Gradle and other minor updates > -- > > Key: KAFKA-6390 > URL: https://issues.apache.org/jira/browse/KAFKA-6390 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 1.1.0 > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2614 is a helpful fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read
[ https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt updated KAFKA-6345: Fix Version/s: 1.1.0 > NetworkClient.inFlightRequestCount() is not thread safe, causing > ConcurrentModificationExceptions when sensors are read > --- > > Key: KAFKA-6345 > URL: https://issues.apache.org/jira/browse/KAFKA-6345 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 >Reporter: radai rosenblatt >Assignee: Sean McCauliff >Priority: Major > Fix For: 1.1.0 > > > example stack trace (code is ~0.10.2.*) > {code} > java.util.ConcurrentModificationException: > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) > at java.util.HashMap$ValueIterator.next(HashMap.java:1458) > at > org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109) > at > org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382) > at > org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > {code} > looking at latest trunk, the code is still vulnerable: > # NetworkClient.inFlightRequestCount() eventually iterates over > InFlightRequests.requests.values(), which is backed by a (non-thread-safe) > HashMap > # this will be called from the "requests-in-flight" sensor's measure() method > (Sender.java line ~765 in SenderMetrics ctr), which would be driven by some > thread reading JMX values > # HashMap in question would also be updated by some client io thread calling > NetworkClient.doSend() - which calls into InFlightRequests.add()) > i guess the only upside is that this exception will always happen on the > thread reading the JMX values and never on the actual client io thread ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read
[ https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt resolved KAFKA-6345. - Resolution: Fixed > NetworkClient.inFlightRequestCount() is not thread safe, causing > ConcurrentModificationExceptions when sensors are read > --- > > Key: KAFKA-6345 > URL: https://issues.apache.org/jira/browse/KAFKA-6345 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 >Reporter: radai rosenblatt >Assignee: Sean McCauliff >Priority: Major > Fix For: 1.1.0 > > > example stack trace (code is ~0.10.2.*) > {code} > java.util.ConcurrentModificationException: > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) > at java.util.HashMap$ValueIterator.next(HashMap.java:1458) > at > org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109) > at > org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382) > at > org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > {code} > looking at latest trunk, the code is still vulnerable: > # NetworkClient.inFlightRequestCount() eventually iterates over > InFlightRequests.requests.values(), which is backed by a (non-thread-safe) > HashMap > # this will be called from the "requests-in-flight" sensor's measure() method > (Sender.java line ~765 in SenderMetrics ctr), which would be driven by some > thread reading JMX values > # HashMap in question would also be updated by some client io thread calling > NetworkClient.doSend() - which calls into InFlightRequests.add()) > i guess the only upside is that this exception will always happen on the > thread reading the JMX values and never on the actual client io thread ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read
[ https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt reassigned KAFKA-6345: --- Assignee: Sean McCauliff > NetworkClient.inFlightRequestCount() is not thread safe, causing > ConcurrentModificationExceptions when sensors are read > --- > > Key: KAFKA-6345 > URL: https://issues.apache.org/jira/browse/KAFKA-6345 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 >Reporter: radai rosenblatt >Assignee: Sean McCauliff >Priority: Major > > example stack trace (code is ~0.10.2.*) > {code} > java.util.ConcurrentModificationException: > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) > at java.util.HashMap$ValueIterator.next(HashMap.java:1458) > at > org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109) > at > org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382) > at > org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > {code} > looking at latest trunk, the code is still vulnerable: > # NetworkClient.inFlightRequestCount() eventually iterates over > InFlightRequests.requests.values(), which is backed by a (non-thread-safe) > HashMap > # this will be called from the "requests-in-flight" sensor's measure() method > (Sender.java line ~765 in SenderMetrics ctr), which would be driven by some > thread reading JMX values > # HashMap in question would also be updated by some client io thread calling > NetworkClient.doSend() - which calls into InFlightRequests.add()) > i guess the only upside is that this exception will always happen on the > thread reading the JMX values and never on the actual client io thread ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"
[ https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357456#comment-16357456 ] Colin P. McCabe commented on KAFKA-6544: What are the open file descriptors that you are seeing? Can you get the output of {{sudo ls -l /proc/$KAFKA_PID/fd}} (where KAFKA_PID is the process ID of Kafka?) And also perhaps {{netstat -pnt}}? > kafka process should exit when it encounters "java.io.IOException: Too many > open files" > - > > Key: KAFKA-6544 > URL: https://issues.apache.org/jira/browse/KAFKA-6544 > Project: Kafka > Issue Type: Bug > Components: admin, network >Affects Versions: 0.10.2.1 >Reporter: Yu Yang >Priority: Major > > Our kafka cluster encountered a few disk/xfs failures in the cloud vm > instances. When a disk/xfs failure happens, kafka process did not exit > gracefully. Instead, it run into "" status, with port 9092 still be > reachable. when failures like this happens, kafka should shutdown all > threads and exit. The following is the kafka logs when the failure happens: > {code:java} > [2018-02-08 12:52:31,764] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:748) > [2018-02-08 12:52:31,772] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:748) > [2018-02-08 12:52:31,772] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"
[ https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357419#comment-16357419 ] Ted Yu commented on KAFKA-6544: --- One option is to shutdown the process in SocketServer : {code} case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) {code} upon seeing IOException with the proper message. > kafka process should exit when it encounters "java.io.IOException: Too many > open files" > - > > Key: KAFKA-6544 > URL: https://issues.apache.org/jira/browse/KAFKA-6544 > Project: Kafka > Issue Type: Bug > Components: admin, network >Affects Versions: 0.10.2.1 >Reporter: Yu Yang >Priority: Major > > Our kafka cluster encountered a few disk/xfs failures in the cloud vm > instances. When a disk/xfs failure happens, kafka process did not exit > gracefully. Instead, it run into "" status, with port 9092 still be > reachable. when failures like this happens, kafka should shutdown all > threads and exit. The following is the kafka logs when the failure happens: > {code:java} > [2018-02-08 12:52:31,764] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:748) > [2018-02-08 12:52:31,772] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:748) > [2018-02-08 12:52:31,772] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists
[ https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357401#comment-16357401 ] Graham Campbell commented on KAFKA-6388: Most of the recent times we've run into this it's been on non-compacted topics that have been idle for a while (no data in for > retention.ms) and then begin receiving data again. It's not happening to every replica, with 15 partitions and 3 replicas sometimes one or two followers will encounter this for a given topic. > Error while trying to roll a segment that already exists > > > Key: KAFKA-6388 > URL: https://issues.apache.org/jira/browse/KAFKA-6388 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: David Hay >Priority: Blocker > > Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in > our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2). > After spending 30 min or more spewing log messages like this: > {noformat} > [2017-12-19 16:44:28,998] INFO Replica loaded for partition > screening.save.results.screening.save.results.processor.error-43 with initial > high watermark 0 (kafka.cluster.Replica) > {noformat} > Eventually, the replica thread throws the error below (also referenced in the > original issue). If I remove that partition from the data directory and > bounce the broker, it eventually rebalances (assuming it doesn't hit a > different partition with the same error). > {noformat} > 2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.log already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.index already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.timeindex already exists; deleting it first > (kafka.log.Log) > [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions __consumer_offsets-20 > (kafka.server.ReplicaFetcherManager) > [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > sr.new.sr.new.processor.error-38 offset 2 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Caused by: kafka.common.KafkaException: Trying to roll a new log segment for > topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it > already exists. > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338) > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.roll(Log.scala:1297) > at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.append(Log.scala:624) > at kafka.log.Log.appendAsFollower(Log.scala:607) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > at >
[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists
[ https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357331#comment-16357331 ] Paul Davidson commented on KAFKA-6388: -- [~dhay] We saw this on non-compacted topics. > Error while trying to roll a segment that already exists > > > Key: KAFKA-6388 > URL: https://issues.apache.org/jira/browse/KAFKA-6388 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: David Hay >Priority: Blocker > > Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in > our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2). > After spending 30 min or more spewing log messages like this: > {noformat} > [2017-12-19 16:44:28,998] INFO Replica loaded for partition > screening.save.results.screening.save.results.processor.error-43 with initial > high watermark 0 (kafka.cluster.Replica) > {noformat} > Eventually, the replica thread throws the error below (also referenced in the > original issue). If I remove that partition from the data directory and > bounce the broker, it eventually rebalances (assuming it doesn't hit a > different partition with the same error). > {noformat} > 2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.log already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.index already exists; deleting it first (kafka.log.Log) > [2017-12-19 15:16:24,227] WARN Newly rolled segment file > 0002.timeindex already exists; deleting it first > (kafka.log.Log) > [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions __consumer_offsets-20 > (kafka.server.ReplicaFetcherManager) > [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > sr.new.sr.new.processor.error-38 offset 2 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Caused by: kafka.common.KafkaException: Trying to roll a new log segment for > topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it > already exists. > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338) > at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.roll(Log.scala:1297) > at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.append(Log.scala:624) > at kafka.log.Log.appendAsFollower(Log.scala:607) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184) > ... 13 more > [2017-12-19 15:16:24,302] INFO
[jira] [Resolved] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
[ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6362. Resolution: Fixed Fix Version/s: 1.1.0 > auto commit not work since coordinatorUnknown() is always true. > --- > > Key: KAFKA-6362 > URL: https://issues.apache.org/jira/browse/KAFKA-6362 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.10.2.1 >Reporter: Renkai Ge >Assignee: huxihx >Priority: Major > Fix For: 1.1.0 > > > {code} > [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, > 11.192.73.66:3002] > check.crcs = true > client.id = > connections.max.idle.ms = 54 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = tcprtdetail_flink > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 > [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : > e89bffd6b2eff799 > {code} > My kafka java client cannot auto commit.After add some debug log,I found that > the coordinatorUnknown() function in > [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604] > always returns true,and nextAutoCommitDeadline just increases > infinitly.Should there be a lookupCoordinator() after line 604 like in > [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After > I add lookupCoordinator() next to line 604.The consumer can auto commit > offset properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
[ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357298#comment-16357298 ] ASF GitHub Bot commented on KAFKA-6362: --- hachikuji closed pull request #4326: KAFKA-6362: maybeAutoCommitOffsetsAsync should try to discover coordinator URL: https://github.com/apache/kafka/pull/4326 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1d84f847cd8..2f7fd58a66f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1058,7 +1058,7 @@ public void assign(Collection partitions) { // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance -this.coordinator.maybeAutoCommitOffsetsNow(); + this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5c1e60eee82..d7c1ce9966e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -528,6 +528,7 @@ public void commitOffsetsAsync(final Mapoffs public void onSuccess(Void value) { pendingAsyncCommits.decrementAndGet(); doCommitOffsetsAsync(offsets, callback); +client.pollNoWakeup(); } @Override @@ -623,20 +624,10 @@ public boolean commitOffsetsSync(Map offsets, return false; } -private void maybeAutoCommitOffsetsAsync(long now) { -if (autoCommitEnabled) { -if (coordinatorUnknown()) { -this.nextAutoCommitDeadline = now + retryBackoffMs; -} else if (now >= nextAutoCommitDeadline) { -this.nextAutoCommitDeadline = now + autoCommitIntervalMs; -doAutoCommitOffsetsAsync(); -} -} -} - -public void maybeAutoCommitOffsetsNow() { -if (autoCommitEnabled && !coordinatorUnknown()) +public void maybeAutoCommitOffsetsAsync(long now) { +if (autoCommitEnabled && now >= nextAutoCommitDeadline) { doAutoCommitOffsetsAsync(); +} } private void doAutoCommitOffsetsAsync() { @@ -650,8 +641,11 @@ public void onComplete(Map offsets, Exception log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); +else +nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); +nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; } } }); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 76301a71bea..c49339b6525 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1625,6 +1625,25 @@ public void testHeartbeatThreadClose() throws Exception { assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId)); } +@Test +public void testAutoCommitAfterCoordinatorBackToService() { +ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, +ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); +
[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357279#comment-16357279 ] Jason Gustafson commented on KAFKA-6541: [~anhldbk] This was fixed in KAFKA-6366 which will be included in 1.0.1. The current RC is available here: [http://home.apache.org/~ewencp/kafka-1.0.1-rc0/]. If you have time to confirm the fix and close this JIRA, we would appreciate it. > StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread > -- > > Key: KAFKA-6541 > URL: https://issues.apache.org/jira/browse/KAFKA-6541 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Anh Le >Priority: Major > > There's something wrong with our client library when sending heart beats. > This bug seems to be identical to this one: > [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] > > Here's the log: > > {quote}2018-02-08 13:55:01,102 ERROR > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread > Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | > default-group': > java.lang.StackOverflowError: null > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398) > at ch.qos.logback.classic.Logger.info(Logger.java:583) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at >
[jira] [Resolved] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables
[ https://issues.apache.org/jira/browse/KAFKA-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6543. Resolution: Duplicate > Allow KTables to be bootstrapped at start up, like GKTables > --- > > Key: KAFKA-6543 > URL: https://issues.apache.org/jira/browse/KAFKA-6543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > In some uses cases, it's desirable to have KTables "fully" bootstrapped (at > leas in best efforts) before the topology begins, similar to how a GKTable > does. This could prevent join race conditions for one, which could be a big > problem if local KTable state has been lost. > > Related to KAFKA-6542 Tables should trigger joins too, not just streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357221#comment-16357221 ] Nick Travers commented on KAFKA-4914: - [~damianguy] - noticed this got updated recently. I've had a PR open for for a while now (coming up on a year). [~ijuma] has been helping me iterate on it. Would love to get that merged if possible! > Partition re-assignment tool should check types before persisting state in > ZooKeeper > > > Key: KAFKA-4914 > URL: https://issues.apache.org/jira/browse/KAFKA-4914 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 0.10.1.1 >Reporter: Nick Travers >Assignee: Nick Travers >Priority: Major > Fix For: 2.0.0 > > > The partition-reassignment too currently allows non-type-safe information to > be persisted into ZooKeeper, which can result in a ClassCastException at > runtime for brokers. > Specifically, this occurred when the broker assignment field was a List of > Strings, instead of a List of Integers. > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener > on Controller 10]: Error while handling broker changes > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at > kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435) > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy
[ https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357214#comment-16357214 ] Andrew Olson commented on KAFKA-2435: - [~jeffwidman] yes, that sounds quite reasonable to me. > More optimally balanced partition assignment strategy > - > > Key: KAFKA-2435 > URL: https://issues.apache.org/jira/browse/KAFKA-2435 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-2435.patch > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the original high-level consumer. For the new consumer, > see KAFKA-3297. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-2435) More optimally balanced partition assignment strategy
[ https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357203#comment-16357203 ] Jeff Widman edited comment on KAFKA-2435 at 2/8/18 4:54 PM: Can this be closed as wontfix? Two reasons: # it targets the deprecated (removed?) high-level consumer # KIP-54 addressed some of the concerns here (although possibly not all, as mentioned here: https://issues.apache.org/jira/browse/KAFKA-3297?focusedCommentId=15788229=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15788229) was (Author: jeffwidman): Can this be closed as wontfix? Two reasons: # it targets the deprecated (removed?) high-level consumer # KIP-54 addressed some of the concerns here, and IMHO is a better solution because it addresses both fairness and affinity/stickyness > More optimally balanced partition assignment strategy > - > > Key: KAFKA-2435 > URL: https://issues.apache.org/jira/browse/KAFKA-2435 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-2435.patch > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the original high-level consumer. For the new consumer, > see KAFKA-3297. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-2435) More optimally balanced partition assignment strategy
[ https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357203#comment-16357203 ] Jeff Widman edited comment on KAFKA-2435 at 2/8/18 4:51 PM: Can this be closed as wontfix? Two reasons: # it targets the deprecated (removed?) high-level consumer # KIP-54 addressed some of the concerns here, and IMHO is a better solution because it addresses both fairness and affinity/stickyness was (Author: jeffwidman): Can this be closed as wontfix? Two reasons: 1) it targets the deprecated (removed?) high-level consumer 2) KIP-54 addressed some of the concerns here, and IMHO is a better solution because it addresses both fairness and affinity/stickyness > More optimally balanced partition assignment strategy > - > > Key: KAFKA-2435 > URL: https://issues.apache.org/jira/browse/KAFKA-2435 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-2435.patch > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the original high-level consumer. For the new consumer, > see KAFKA-3297. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy
[ https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357203#comment-16357203 ] Jeff Widman commented on KAFKA-2435: Can this be closed as wontfix? Two reasons: 1) it targets the deprecated (removed?) high-level consumer 2) KIP-54 addressed some of the concerns here, and IMHO is a better solution because it addresses both fairness and affinity/stickyness > More optimally balanced partition assignment strategy > - > > Key: KAFKA-2435 > URL: https://issues.apache.org/jira/browse/KAFKA-2435 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-2435.patch > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the original high-level consumer. For the new consumer, > see KAFKA-3297. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6504) Connect: Some per-task-metrics not working
[ https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6504. Resolution: Fixed Fix Version/s: 1.1.0 > Connect: Some per-task-metrics not working > -- > > Key: KAFKA-6504 > URL: https://issues.apache.org/jira/browse/KAFKA-6504 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Assignee: Robert Yokota >Priority: Minor > Fix For: 1.1.0 > > > Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at > least it seems like MBean > "kafka.connect:type=source-task-metrics,connector=,task=x" > attribute "source-record-active-count" reports the same number for all x > tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a > source-connector "my-connector" with 2 tasks that both run in the same > Kafka-Connect instance, but I know that only one of them actually produces > anything (and therefore can have "active source-records") both > "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and > "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes > up (following each other). It should only go up for the one task that > actually produces something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357015#comment-16357015 ] Damian Guy commented on KAFKA-6529: --- [~ijuma] should this go in to 1.1? > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > Fix For: 1.1.0, 0.11.0.3, 1.0.2 > > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5919) Adding checks on "version" field for tools using it
[ https://issues.apache.org/jira/browse/KAFKA-5919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5919: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Adding checks on "version" field for tools using it > --- > > Key: KAFKA-5919 > URL: https://issues.apache.org/jira/browse/KAFKA-5919 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Fix For: 2.0.0 > > > Hi, > the kafka-delete-records script allows user to pass information about records > to delete through a JSON file. Such file, as described in the command help, > is made by a "partitions" array and a "version" field. Reading > [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient] > and the DeleteRecords API (Key: 21) description it's not clear what such > field is and even it's not used at all (in the current implementation). > It turned out that the field is for having backward compatibility in the > future where the JSON format could change. This JIRA is about adding more > checks on the "version" field having it not mandatory but assuming the > earliest version (current 1) if it's omitted from the JSON file. > The same for the kafka-reassign-partitions which has a topics-to-move JSON > file as input (used with --generate option) and the partitions-to-move.json > (used with --execute option). In both cases the same logic can be applied as > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6448) Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the annotation
[ https://issues.apache.org/jira/browse/KAFKA-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6448: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the > annotation > --- > > Key: KAFKA-6448 > URL: https://issues.apache.org/jira/browse/KAFKA-6448 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Hongyuan Li >Priority: Minor > Fix For: 2.0.0 > > Attachments: KAFKA-6448-1.patch, KAFKA-6448-2.patch > > > In the annotation, it said > {code}*This feature must be enabled with -Dmx4jenable=true*{code} > *which is not compatible with the code* > {code} > ** > props.getBoolean("kafka_mx4jenable", false) > ** > {code} > patch KAFKA-6448-1.patch modifies the code, and KAFKA-6448-2.patch modifies > the annotation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5532) Making bootstrap.servers property a first citizen option for the ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-5532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5532: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Making bootstrap.servers property a first citizen option for the > ProducerPerformance > > > Key: KAFKA-5532 > URL: https://issues.apache.org/jira/browse/KAFKA-5532 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Trivial > Fix For: 2.0.0 > > > Hi, > using the ProducerPerformance tool you have to specify the bootstrap.servers > option using the producer-props or producer-config option. It could be better > having bootstrap.servers as a first citizen option like all the other tools, > so a dedicate --bootstrap-servers option. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5517) Support linking to particular configuration parameters
[ https://issues.apache.org/jira/browse/KAFKA-5517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5517: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Support linking to particular configuration parameters > -- > > Key: KAFKA-5517 > URL: https://issues.apache.org/jira/browse/KAFKA-5517 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > Currently the configuration parameters are documented long tables, and it's > only possible to link to the heading before a particular table. When > discussing configuration parameters on forums it would be helpful to be able > to link to the particular parameter under discussion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5479) Docs for authorization omit authorizer.class.name
[ https://issues.apache.org/jira/browse/KAFKA-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5479: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Docs for authorization omit authorizer.class.name > - > > Key: KAFKA-5479 > URL: https://issues.apache.org/jira/browse/KAFKA-5479 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > The documentation in §7.4 Authorization and ACLs doesn't mention the > {{authorizer.class.name}} setting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5692: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Refactor PreferredReplicaLeaderElectionCommand to use AdminClient > - > > Key: KAFKA-5692 > URL: https://issues.apache.org/jira/browse/KAFKA-5692 > Project: Kafka > Issue Type: Improvement >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: kip, patch-available > Fix For: 2.0.0 > > > The PreferredReplicaLeaderElectionCommand currently uses a direct connection > to zookeeper. The zookeeper dependency should be deprecated and an > AdminClient API created to be used instead. > This change will require a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace
[ https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5359: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Exceptions from RequestFuture lack parts of the stack trace > --- > > Key: KAFKA-5359 > URL: https://issues.apache.org/jira/browse/KAFKA-5359 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Magnus Reftel >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > When an exception occurs within a task that reports its result using a > RequestFuture, that exception is stored in a field on the RequestFuture using > the {{raise}} method. In many places in the code where such futures are > completed, that exception is then thrown directly using {{throw > future.exception();}} (see e.g. > [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]). > This means that the exception that ends up in client code only has stack > traces related to the original exception, but nothing leading up to the > completion of the future. The client therefore gets no indication of what was > going on in the client code - only that it somehow ended up in the Kafka > libraries, and that a task failed at some point. > One solution to this is to use the exceptions from the future as causes for > chained exceptions, so that the client gets a stack trace that shows what the > client was doing, in addition to getting the stack traces for the exception > in the task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4950: -- Fix Version/s: (was: 1.1.0) 2.0.0 > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Sébastien Launay >Priority: Minor > Fix For: 2.0.0, 1.0.2 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public MapgetMetrics() { > final Map gauges = new HashMap (); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4794: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > Fix For: 2.0.0 > > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4126) No relevant log when the topic is non-existent
[ https://issues.apache.org/jira/browse/KAFKA-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4126: -- Fix Version/s: (was: 1.1.0) 2.0.0 > No relevant log when the topic is non-existent > -- > > Key: KAFKA-4126 > URL: https://issues.apache.org/jira/browse/KAFKA-4126 > Project: Kafka > Issue Type: Bug >Reporter: Balázs Barnabás >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > When a producer sends a ProducerRecord into a Kafka topic that doesn't > existst, there is no relevant debug/error log that points out the error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4893: -- Fix Version/s: (was: 1.1.0) 2.0.0 > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at kafka.cluster.Partition.delete(Partition.scala:137) > at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) > at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) > at kafka.server.KafkaApis.handle(KafkaApis.scala:86) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) > at java.lang.Thread.run(Thread.java:745) > {code} > The topic after this point still exists but has Leader set to -1 and the > controller recognizes the topic completion as incomplete (the topic znode is > still in /admin/delete_topics). > I don't believe linkedin has any topic name this long but I'm making the > ticket in case anyone runs into this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3999) Consumer bytes-fetched metric uses decompressed message size
[ https://issues.apache.org/jira/browse/KAFKA-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3999: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Consumer bytes-fetched metric uses decompressed message size > > > Key: KAFKA-3999 > URL: https://issues.apache.org/jira/browse/KAFKA-3999 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 2.0.0 > > > It looks like the computation for the bytes-fetched metrics uses the size of > the decompressed message set. I would have expected it to be based off of the > raw size of the fetch responses. Perhaps it would be helpful to expose both > the raw and decompressed fetch sizes? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4931) stop script fails due 4096 ps output limit
[ https://issues.apache.org/jira/browse/KAFKA-4931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4931: -- Fix Version/s: (was: 1.1.0) 2.0.0 > stop script fails due 4096 ps output limit > -- > > Key: KAFKA-4931 > URL: https://issues.apache.org/jira/browse/KAFKA-4931 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.2.0 >Reporter: Amit Jain >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > When run the script: bin/zookeeper-server-stop.sh fails to stop the zookeeper > server process if the ps output exceeds 4096 character limit of linux. I > think instead of ps we can use ${JAVA_HOME}/bin/jps -vl | grep QuorumPeerMain > it would correctly stop zookeeper process. Currently we are using kill > PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk > '{print $1}') -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3733) Avoid long command lines by setting CLASSPATH in environment
[ https://issues.apache.org/jira/browse/KAFKA-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3733: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Avoid long command lines by setting CLASSPATH in environment > > > Key: KAFKA-3733 > URL: https://issues.apache.org/jira/browse/KAFKA-3733 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Adrian Muraru >Assignee: Adrian Muraru >Priority: Minor > Fix For: 2.0.0 > > > {{kafka-run-class.sh}} sets the JVM classpath in the command line via {{-cp}}. > This generates long command lines that gets trimmed by the shell in commands > like ps, pgrep,etc. > An alternative is to set the CLASSPATH in environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3575) Use console consumer access topic that does not exist, can not use "Control + C" to exit process
[ https://issues.apache.org/jira/browse/KAFKA-3575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3575: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Use console consumer access topic that does not exist, can not use "Control + > C" to exit process > > > Key: KAFKA-3575 > URL: https://issues.apache.org/jira/browse/KAFKA-3575 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 > Environment: SUSE Linux Enterprise Server 11 SP3 >Reporter: NieWang >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.0.0 > > > 1. use "sh kafka-console-consumer.sh --zookeeper 10.252.23.133:2181 --topic > topic_02" start console consumer. topic_02 does not exist. > 2. you can not use "Control + C" to exit console consumer process. The > process is blocked. > 3. use jstack check process stack, as follows: > linux:~ # jstack 122967 > 2016-04-18 15:46:06 > Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode): > "Attach Listener" #29 daemon prio=9 os_prio=0 tid=0x01781800 > nid=0x1e0c8 waiting on condition [0x] >java.lang.Thread.State: RUNNABLE > "Thread-4" #27 prio=5 os_prio=0 tid=0x018a4000 nid=0x1e08a waiting on > condition [0x7ffbe5ac] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xe00ed3b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101) > "SIGINT handler" #28 daemon prio=9 os_prio=0 tid=0x019d5800 > nid=0x1e089 in Object.wait() [0x7ffbe5bc1000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.$$YJP$$wait(Native Method) > at java.lang.Object.wait(Object.java) > at java.lang.Thread.join(Thread.java:1245) > - locked <0xe71fd4e8> (a kafka.tools.ConsoleConsumer$$anon$1) > at java.lang.Thread.join(Thread.java:1319) > at > java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) > at > java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) > at java.lang.Shutdown.runHooks(Shutdown.java:123) > at java.lang.Shutdown.sequence(Shutdown.java:167) > at java.lang.Shutdown.exit(Shutdown.java:212) > - locked <0xe00abfd8> (a java.lang.Class for > java.lang.Shutdown) > at java.lang.Terminator$1.handle(Terminator.java:52) > at sun.misc.Signal$1.run(Signal.java:212) > at java.lang.Thread.run(Thread.java:745) > "metrics-meter-tick-thread-2" #20 daemon prio=5 os_prio=0 > tid=0x7ffbec77a800 nid=0x1e079 waiting on condition [0x7ffbe66c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xe6fa6438> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > "metrics-meter-tick-thread-1" #19 daemon prio=5 os_prio=0 > tid=0x7ffbec783000 nid=0x1e078 waiting on condition [0x7ffbe67c9000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xe6fa6438> (a >
[jira] [Updated] (KAFKA-5914) Return MessageFormatVersion and MessageMaxBytes in MetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-5914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5914: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Return MessageFormatVersion and MessageMaxBytes in MetadataResponse > --- > > Key: KAFKA-5914 > URL: https://issues.apache.org/jira/browse/KAFKA-5914 > Project: Kafka > Issue Type: Sub-task >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 2.0.0 > > > As part of KIP-192, we want to send two additional fields in the > {{TopicMetadata}} which is part of the {{MetadataResponse}}. These fields are > the {{MessageFormatVersion}} and the {{MessageMaxBytes}}. > The {{MessageFormatVersion}} is required to implement > https://issues.apache.org/jira/browse/KAFKA-5794 . The latter will be > implemented in a future release, but with the changes proposed here, the said > future release will be backward compatible with 1.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5952) Refactor Consumer Fetcher metrics
[ https://issues.apache.org/jira/browse/KAFKA-5952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5952: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Refactor Consumer Fetcher metrics > - > > Key: KAFKA-5952 > URL: https://issues.apache.org/jira/browse/KAFKA-5952 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5951) Autogenerate Producer RecordAccumulator metrics
[ https://issues.apache.org/jira/browse/KAFKA-5951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5951: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Autogenerate Producer RecordAccumulator metrics > --- > > Key: KAFKA-5951 > URL: https://issues.apache.org/jira/browse/KAFKA-5951 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6390) Update ZooKeeper to 3.4.11, Gradle and other minor updates
[ https://issues.apache.org/jira/browse/KAFKA-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-6390: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Update ZooKeeper to 3.4.11, Gradle and other minor updates > -- > > Key: KAFKA-6390 > URL: https://issues.apache.org/jira/browse/KAFKA-6390 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2614 is a helpful fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5944) Add unit tests for handling of authentication failures in clients
[ https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5944: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Add unit tests for handling of authentication failures in clients > - > > Key: KAFKA-5944 > URL: https://issues.apache.org/jira/browse/KAFKA-5944 > Project: Kafka > Issue Type: Test > Components: clients >Reporter: Rajini Sivaram >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > KAFKA-5854 improves authentication failures in clients and has added > integration tests and some basic client-side tests that create actual > connections to a mock server. It will be good to add a set of tests for > producers, consumers etc. that use MockClient to add more extensive tests for > various scenarios. > cc [~hachikuji] [~vahid] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)
[ https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5886: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Introduce delivery.timeout.ms producer config (KIP-91) > -- > > Key: KAFKA-5886 > URL: https://issues.apache.org/jira/browse/KAFKA-5886 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sumant Tambe >Assignee: Sumant Tambe >Priority: Major > Fix For: 2.0.0 > > > We propose adding a new timeout delivery.timeout.ms. The window of > enforcement includes batching in the accumulator, retries, and the inflight > segments of the batch. With this config, the user has a guaranteed upper > bound on when a record will either get sent, fail or expire from the point > when send returns. In other words we no longer overload request.timeout.ms to > act as a weak proxy for accumulator timeout and instead introduce an explicit > timeout that users can rely on without exposing any internals of the producer > such as the accumulator. > See > [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer] > for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5637) Document compatibility and release policies
[ https://issues.apache.org/jira/browse/KAFKA-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5637: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Document compatibility and release policies > --- > > Key: KAFKA-5637 > URL: https://issues.apache.org/jira/browse/KAFKA-5637 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Ismael Juma >Assignee: Sönke Liebau >Priority: Major > Fix For: 2.0.0 > > > We should document our compatibility and release policies in one place so > that people have the correct expectations. This is generally important, but > more so now that we are releasing 1.0.0. > I extracted the following topics from the mailing list thread as the ones > that should be documented as a minimum: > *Code stability* > * Explanation of stability annotations and their implications > * Explanation of what public apis are > * *Discussion point: * Do we want to keep the _unstable_ annotation or is > _evolving_ sufficient going forward? > *Support duration* > * How long are versions supported? > * How far are bugfixes backported? > * How far are security fixes backported? > * How long are protocol versions supported by subsequent code versions? > * How long are older clients supported? > * How long are older brokers supported? > I will create an initial pull request to add a section to the documentation > as basis for further discussion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods
[ https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5445: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Document exceptions thrown by AdminClient methods > - > > Key: KAFKA-5445 > URL: https://issues.apache.org/jira/browse/KAFKA-5445 > Project: Kafka > Issue Type: Improvement > Components: admin, clients >Reporter: Ismael Juma >Assignee: Andrey Dyachkov >Priority: Major > Fix For: 2.0.0, 1.0.2 > > > AdminClient should document the exceptions that users may have to handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5403) Transactions system test should dedup consumed messages by offset
[ https://issues.apache.org/jira/browse/KAFKA-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5403: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Transactions system test should dedup consumed messages by offset > - > > Key: KAFKA-5403 > URL: https://issues.apache.org/jira/browse/KAFKA-5403 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 2.0.0 > > > In KAFKA-5396, we saw that the consumers which verify the data in multiple > topics could read the same offsets multiple times, for instance when a > rebalance happens. > This would detect spurious duplicates, causing the test to fail. We should > dedup the consumed messages by offset and only fail the test if we have > duplicate values for a if for a unique set of offsets. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.
[ https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4701: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Allow kafka brokers to dynamically reload truststore without restarting. > > > Key: KAFKA-4701 > URL: https://issues.apache.org/jira/browse/KAFKA-4701 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Allen Xiang >Priority: Major > Labels: security > Fix For: 2.0.0 > > > Right now in order to add SSL clients(update broker truststores), a rolling > restart of all brokers is required. This is very time consuming and > unnecessary. A dynamic truststore manager is needed to reload truststore from > file system without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5272) Improve validation for Alter Configs (KIP-133)
[ https://issues.apache.org/jira/browse/KAFKA-5272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5272: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Improve validation for Alter Configs (KIP-133) > -- > > Key: KAFKA-5272 > URL: https://issues.apache.org/jira/browse/KAFKA-5272 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > TopicConfigHandler.processConfigChanges() warns about certain topic configs. > We should include such validations in alter configs and reject the change if > the validation fails. Note that this should be done without changing the > behaviour of the ConfigCommand (as it does not have access to the broker > configs). > We should consider adding other validations like KAFKA-4092 and KAFKA-4680. > Finally, the AlterConfigsPolicy mentioned in KIP-133 will be added at the > same time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5276) Support derived and prefixed configs in DescribeConfigs (KIP-133)
[ https://issues.apache.org/jira/browse/KAFKA-5276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5276: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Support derived and prefixed configs in DescribeConfigs (KIP-133) > - > > Key: KAFKA-5276 > URL: https://issues.apache.org/jira/browse/KAFKA-5276 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > The broker supports config overrides per listener. The way we do that is by > prefixing the configs with the listener name. These configs are not defined > by ConfigDef and they don't appear in `values()`. They do appear in > `originals()`. We should change the code so that we return these configs. > Because these configs are read-only, nothing needs to be done for > AlterConfigs. > With regards to derived configs, an example is advertised.listeners, which > falls back to listeners. This is currently done outside AbstractConfig. We > should look into including these into AbstractConfig so that the fallback > happens for the returned configs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5029) cleanup javadocs and logging
[ https://issues.apache.org/jira/browse/KAFKA-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5029: -- Fix Version/s: (was: 1.1.0) 2.0.0 > cleanup javadocs and logging > > > Key: KAFKA-5029 > URL: https://issues.apache.org/jira/browse/KAFKA-5029 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > Remove state change logger, splitting it up into the controller logs or > broker logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4914: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Partition re-assignment tool should check types before persisting state in > ZooKeeper > > > Key: KAFKA-4914 > URL: https://issues.apache.org/jira/browse/KAFKA-4914 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 0.10.1.1 >Reporter: Nick Travers >Assignee: Nick Travers >Priority: Major > Fix For: 2.0.0 > > > The partition-reassignment too currently allows non-type-safe information to > be persisted into ZooKeeper, which can result in a ClassCastException at > runtime for brokers. > Specifically, this occurred when the broker assignment field was a List of > Strings, instead of a List of Integers. > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener > on Controller 10]: Error while handling broker changes > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at > kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435) > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-2435) More optimally balanced partition assignment strategy
[ https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-2435: -- Fix Version/s: (was: 1.1.0) 2.0.0 > More optimally balanced partition assignment strategy > - > > Key: KAFKA-2435 > URL: https://issues.apache.org/jira/browse/KAFKA-2435 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-2435.patch > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the original high-level consumer. For the new consumer, > see KAFKA-3297. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)
[ https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3297: -- Fix Version/s: (was: 1.1.0) 2.0.0 > More optimally balanced partition assignment strategy (new consumer) > > > Key: KAFKA-3297 > URL: https://issues.apache.org/jira/browse/KAFKA-3297 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.0.0 > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the new consumer. For the original high-level consumer, > see KAFKA-2435. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356998#comment-16356998 ] Damian Guy commented on KAFKA-4879: --- [~hachikuji] is this going to make it for 1.1? > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer
[ https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4307: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Inconsistent parameters between console producer and consumer > - > > Key: KAFKA-4307 > URL: https://issues.apache.org/jira/browse/KAFKA-4307 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.1.0 >Reporter: Gwen Shapira >Assignee: Balint Molnar >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > kafka-console-producer uses --broker-list while kafka-console-consumer uses > --bootstrap-server. > Let's add --bootstrap-server to the producer for some consistency? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling
[ https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4665: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Inconsistent handling of non-existing topics in offset fetch handling > - > > Key: KAFKA-4665 > URL: https://issues.apache.org/jira/browse/KAFKA-4665 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > For version 0 of the offset fetch API, the broker returns > UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at > the time of fetching. In later versions, we skip this check. We do, however, > continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. > if the principal does not have Describe access to the corresponding topic). > We should probably make this behavior consistent across versions. > Note also that currently the consumer raises {{KafkaException}} when it > encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, > which is inconsistent with how we usually handle this error. This probably > doesn't cause any problems currently only because of the inconsistency > mentioned in the first paragraph above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time
[ https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4862: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Kafka client connect to a shutdown node will block for a long time > -- > > Key: KAFKA-4862 > URL: https://issues.apache.org/jira/browse/KAFKA-4862 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.2.0 >Reporter: Pengwei >Assignee: Pengwei >Priority: Major > Fix For: 2.0.0 > > > Currently in our test env, we found after one of the broker node crash(reboot > or os crash), the client maybe still connecting to the crash node to send > metadata request or other request, and it need about several minutes to > aware the connection is timeout then try another node to connect to send the > request. Then the client may still not aware the metadata change after > several minutes. > We don't have a connection timeout for the network client, we should add a > connection timeout for the client -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4249) Document how to customize GC logging options for broker
[ https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4249: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Document how to customize GC logging options for broker > --- > > Key: KAFKA-4249 > URL: https://issues.apache.org/jira/browse/KAFKA-4249 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 0.10.0.1 >Reporter: Jim Hoagland >Assignee: Tom Bentley >Priority: Major > Fix For: 2.0.0 > > > We wanted to enable GC logging for Kafka broker and saw that you can set > GC_LOG_ENABLED=true. However, this didn't do what we wanted. For example, > the GC log will be overwritten every time the broker gets restarted. It > wasn't clear how we could do that (no documentation of it that I can find), > so I did some research by looking at the source code and did some testing and > found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to > starting broker. I posted my solution to StackOverflow: > > http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove > (feel free to critique) > That solution is now public, but it seems like the Kafka documentation should > say how to do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4203) Java producer default max message size does not align with broker default
[ https://issues.apache.org/jira/browse/KAFKA-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4203: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Java producer default max message size does not align with broker default > - > > Key: KAFKA-4203 > URL: https://issues.apache.org/jira/browse/KAFKA-4203 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Grant Henke >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > > The Java producer sets max.request.size = 1048576 (the base 2 version of 1 MB > (MiB)) > The broker sets max.message.bytes = 112 (the base 10 value of 1 MB + 12 > bytes for overhead) > This means that by default the producer can try to produce messages larger > than the broker will accept resulting in RecordTooLargeExceptions. > There were not similar issues in the old producer because it sets > max.message.size = 100 (the base 10 value of 1 MB) > I propose we increase the broker default for max.message.bytes to 1048588 > (the base 2 value of 1 MB (MiB) + 12 bytes for overhead) so that any message > produced with default configs from either producer does not result in a > RecordTooLargeException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3554) Generate actual data with specific compression ratio and add multi-thread support in the ProducerPerformance tool.
[ https://issues.apache.org/jira/browse/KAFKA-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3554: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Generate actual data with specific compression ratio and add multi-thread > support in the ProducerPerformance tool. > -- > > Key: KAFKA-3554 > URL: https://issues.apache.org/jira/browse/KAFKA-3554 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Fix For: 2.0.0 > > > Currently the ProducerPerformance always generate the payload with same > bytes. This does not quite well to test the compressed data because the > payload is extremely compressible no matter how big the payload is. > We can make some changes to make it more useful for compressed messages. > Currently I am generating the payload containing integer from a given range. > By adjusting the range of the integers, we can get different compression > ratios. > API wise, we can either let user to specify the integer range or the expected > compression ratio (we will do some probing to get the corresponding range for > the users) > Besides that, in many cases, it is useful to have multiple producer threads > when the producer threads themselves are bottleneck. Admittedly people can > run multiple ProducerPerformance to achieve similar result, but it is still > different from the real case when people actually use the producer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3689) Exception when attempting to decrease connection count for address with no connections
[ https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3689: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Exception when attempting to decrease connection count for address with no > connections > -- > > Key: KAFKA-3689 > URL: https://issues.apache.org/jira/browse/KAFKA-3689 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 0.9.0.1 > Environment: ubuntu 14.04, > java version "1.7.0_95" > OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2) > OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode) > 3 broker cluster (all 3 servers identical - Intel Xeon E5-2670 @2.6GHz, > 8cores, 16 threads 64 GB RAM & 1 TB Disk) > Kafka Cluster is managed by 3 server ZK cluster (these servers are different > from Kafka broker servers). All 6 servers are connected via 10G switch. > Producers run from external servers. >Reporter: Buvaneswari Ramanan >Assignee: Ryan P >Priority: Major > Fix For: 2.0.0 > > Attachments: KAFKA-3689.log.redacted, kafka-3689-instrumentation.patch > > Original Estimate: 72h > Remaining Estimate: 72h > > As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org > with the same subject, I am creating this bug report. > The following error occurs in one of the brokers in our 3 broker cluster, > which serves about 8000 topics. These topics are single partitioned with a > replication factor = 3. Each topic gets data at a low rate – 200 bytes per > sec. Leaders are balanced across the topics. > Producers run from external servers (4 Ubuntu servers with same config as the > brokers), each producing to 2000 topics utilizing kafka-python library. > This error message occurs repeatedly in one of the servers. Between the hours > of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such > occurrences. This was right after a cluster restart. > This is not the first time we got this error in this broker. In those > instances, error occurred hours / days after cluster restart. > = > [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.IllegalArgumentException: Attempted to decrease connection count > for address with no connections, address: /X.Y.Z.144 (actual network address > masked) > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:59) > at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.run(SocketServer.scala:445) > at java.lang.Thread.run(Thread.java:745) > [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.IllegalArgumentException: Attempted to decrease connection count > for address with no connections, address: /X.Y.Z.144 > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at > kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:59) > at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) > at > kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.run(SocketServer.scala:445) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()
[ https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3190: -- Fix Version/s: (was: 1.1.0) 2.0.0 > KafkaProducer should not invoke callback in send() > -- > > Key: KAFKA-3190 > URL: https://issues.apache.org/jira/browse/KAFKA-3190 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 2.0.0 > > > Currently KafkaProducer will invoke callback.onComplete() if it receives an > ApiException during send(). This breaks the guarantee that callback will be > invoked in order. It seems ApiException in send() only comes from metadata > refresh. If so, we can probably simply throw it instead of invoking > callback(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers
[ https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3438: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Rack Aware Replica Reassignment should warn of overloaded brokers > - > > Key: KAFKA-3438 > URL: https://issues.apache.org/jira/browse/KAFKA-3438 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.0 >Reporter: Ben Stopford >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > We've changed the replica reassignment code to be rack aware. > One problem that might catch users out would be that they rebalance the > cluster using kafka-reassign-partitions.sh but their rack configuration means > that some high proportion of replicas are pushed onto a single, or small > number of, brokers. > This should be an easy problem to avoid, by changing the rack assignment > information, but we should probably warn users if they are going to create > something that is unbalanced. > So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack > awareness enabled. If I add a 13th machine, on a new rack, and run the > rebalance tool, that new machine will get ~6x as many replicas as the least > loaded broker. > Suggest a warning be added to the tool output when --generate is called. > "The most loaded broker has 2.3x as many replicas as the the least loaded > broker. This is likely due to an uneven distribution of brokers across racks. > You're advised to alter the rack config so there are approximately the same > number of brokers per rack" and displays the individual rack→#brokers and > broker→#replicas data for the proposed move. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3177) Kafka consumer can hang when position() is called on a non-existing partition.
[ https://issues.apache.org/jira/browse/KAFKA-3177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3177: -- Fix Version/s: (was: 1.1.0) 2.0.0 > Kafka consumer can hang when position() is called on a non-existing partition. > -- > > Key: KAFKA-3177 > URL: https://issues.apache.org/jira/browse/KAFKA-3177 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jason Gustafson >Priority: Critical > Fix For: 2.0.0 > > > This can be easily reproduced as following: > {code} > { > ... > consumer.assign(SomeNonExsitingTopicParition); > consumer.position(); > ... > } > {code} > It seems when position is called we will try to do the following: > 1. Fetch committed offsets. > 2. If there is no committed offsets, try to reset offset using reset > strategy. in sendListOffsetRequest(), if the consumer does not know the > TopicPartition, it will refresh its metadata and retry. In this case, because > the partition does not exist, we fall in to the infinite loop of refreshing > topic metadata. > Another orthogonal issue is that if the topic in the above code piece does > not exist, position() call will actually create the topic due to the fact > that currently topic metadata request could automatically create the topic. > This is a known separate issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-6542: - Description: At the moment it's quite possible to have a race condition when joining a stream with a table, if the stream event arrives first, before the table event, in which case the join will fail. This is also related to bootstrapping KTables (which is what a GKTable does). Related to: KAFKA-4113 Allow KTable bootstrap was: At the moment it's quite possible to have a race condition when joining a stream with a table, if the stream event arrives first, before the table event, in which case the join will fail. This is also related to bootstrapping KTables (which is what a GKTable does). Related to: KAFKA-6543 Allow KTables to be bootstrapped at start up, like GKTables > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-4113 Allow KTable bootstrap -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-6542: - Description: At the moment it's quite possible to have a race condition when joining a stream with a table, if the stream event arrives first, before the table event, in which case the join will fail. This is also related to bootstrapping KTables (which is what a GKTable does). Related to: KAFKA-6543 Allow KTables to be bootstrapped at start up, like GKTables was: At the moment it's quite possible to have a race condition when joining a stream with a table, if the stream event arrives first, before the table event, in which case the join will fail. This is also related to bootstrapping KTables (which is what a GKTable does). > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-6543 Allow KTables to be bootstrapped at start up, like > GKTables -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables
[ https://issues.apache.org/jira/browse/KAFKA-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-6543: - Affects Version/s: 1.1.0 Description: In some uses cases, it's desirable to have KTables "fully" bootstrapped (at leas in best efforts) before the topology begins, similar to how a GKTable does. This could prevent join race conditions for one, which could be a big problem if local KTable state has been lost. Related to KAFKA-6542 Tables should trigger joins too, not just streams was:In some uses cases, it's desirable to have KTables "fully" bootstrapped (at leas in best efforts) before the topology begins, similar to how a GKTable does. This could prevent join race conditions for one, which could be a big problem if local KTable state has been lost. Component/s: streams > Allow KTables to be bootstrapped at start up, like GKTables > --- > > Key: KAFKA-6543 > URL: https://issues.apache.org/jira/browse/KAFKA-6543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > In some uses cases, it's desirable to have KTables "fully" bootstrapped (at > leas in best efforts) before the topology begins, similar to how a GKTable > does. This could prevent join race conditions for one, which could be a big > problem if local KTable state has been lost. > > Related to KAFKA-6542 Tables should trigger joins too, not just streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables
Antony Stubbs created KAFKA-6543: Summary: Allow KTables to be bootstrapped at start up, like GKTables Key: KAFKA-6543 URL: https://issues.apache.org/jira/browse/KAFKA-6543 Project: Kafka Issue Type: Improvement Reporter: Antony Stubbs In some uses cases, it's desirable to have KTables "fully" bootstrapped (at leas in best efforts) before the topology begins, similar to how a GKTable does. This could prevent join race conditions for one, which could be a big problem if local KTable state has been lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6542) Tables should trigger joins too, not just streams
Antony Stubbs created KAFKA-6542: Summary: Tables should trigger joins too, not just streams Key: KAFKA-6542 URL: https://issues.apache.org/jira/browse/KAFKA-6542 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.1.0 Reporter: Antony Stubbs At the moment it's quite possible to have a race condition when joining a stream with a table, if the stream event arrives first, before the table event, in which case the join will fail. This is also related to bootstrapping KTables (which is what a GKTable does). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5165) Kafka Logs Cleanup Not happening, Huge File Growth - Windows
[ https://issues.apache.org/jira/browse/KAFKA-5165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356937#comment-16356937 ] Akash C commented on KAFKA-5165: We are facing a similar issue in 0.11.0.0. Is there a workaround for this? > Kafka Logs Cleanup Not happening, Huge File Growth - Windows > > > Key: KAFKA-5165 > URL: https://issues.apache.org/jira/browse/KAFKA-5165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 > Environment: windows, Kafka Server(Version: 0.9.0.1) >Reporter: Manikandan P >Priority: Major > > We had set the below configuration: Retention hours as 1, Retention bytes as > 150 MB in the server.properties in the Kafka Server(Version: 0.9.0.1). Also > modified other settings as given below. > log.dirs=/tmp/kafka-logs > log.retention.hours=1 > log.retention.bytes=157286400 > log.segment.bytes=1073741824 > log.retention.check.interval.ms=30 > log.cleaner.enable=true > log.cleanup.policy=delete > After checking few days, Size of the Kafka log folder too huge as 13.2 GB. We > have seen that Topic Offset getting updated and ignores the Old data but Log > File doesnt reduce and has all the Old Data and become too huge. Could you > help us to find out why Kafka is not deleting the logs(Physically). Do we > need to change any configuration ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should accept raw text
[ https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356842#comment-16356842 ] ASF GitHub Bot commented on KAFKA-6424: --- h314to opened a new pull request #4549: KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text URL: https://github.com/apache/kafka/pull/4549 * Remove to .toLowerCase in word count stream * Add method to read text from file, and move text to resources file * Minor cleanup: change argument order in assertEquals ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > QueryableStateIntegrationTest#queryOnRebalance should accept raw text > - > > Key: KAFKA-6424 > URL: https://issues.apache.org/jira/browse/KAFKA-6424 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Filipe Agapito >Priority: Minor > Labels: newbie, unit-test > > I was using QueryableStateIntegrationTest#queryOnRebalance for some > performance test by adding more sentences to inputValues. > I found that when the sentence contains upper case letter, the test would > timeout. > I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} > before the split. > Ideally we can specify the path to text file which contains the text. The > test can read the text file and generate the input array. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should accept raw text
[ https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356836#comment-16356836 ] Filipe Agapito commented on KAFKA-6424: --- This happens because {{createCountStream}} is using {{value.toLowerCase(Locale.getDefault())}} so an uppercase word never shows up in the stream and {{verifyAllKVKeys}} timeouts. I fixed it by removing the {{.toLowerCase}} from {{createCountStream}}. In order to comply with the last requirement in the description I've also added a method that reads a file in the test resources and generates the inputValue list. I'll be creating a pull request shortly. > QueryableStateIntegrationTest#queryOnRebalance should accept raw text > - > > Key: KAFKA-6424 > URL: https://issues.apache.org/jira/browse/KAFKA-6424 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Filipe Agapito >Priority: Minor > Labels: newbie, unit-test > > I was using QueryableStateIntegrationTest#queryOnRebalance for some > performance test by adding more sentences to inputValues. > I found that when the sentence contains upper case letter, the test would > timeout. > I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} > before the split. > Ideally we can specify the path to text file which contains the text. The > test can read the text file and generate the input array. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356819#comment-16356819 ] Anh Le commented on KAFKA-6541: --- [~huxi_2b] I created [a PR|https://github.com/apache/kafka/pull/4548] for this issue. Please have a look at it. > StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread > -- > > Key: KAFKA-6541 > URL: https://issues.apache.org/jira/browse/KAFKA-6541 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Anh Le >Priority: Major > > There's something wrong with our client library when sending heart beats. > This bug seems to be identical to this one: > [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] > > Here's the log: > > {quote}2018-02-08 13:55:01,102 ERROR > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread > Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | > default-group': > java.lang.StackOverflowError: null > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398) > at ch.qos.logback.classic.Logger.info(Logger.java:583) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at >
[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356818#comment-16356818 ] ASF GitHub Bot commented on KAFKA-6541: --- anhldbk opened a new pull request #4548: KAFKA-6541: Fix a StackOverflow bug in kafka-coordinator-heartbeat-thread URL: https://github.com/apache/kafka/pull/4548 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* - The bug is caused by infinite recursive calls to `ConsumerNetworkClient.disconnect()` - In class `org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient`, I introduced a boolean field named `disconnecting` to prevent such calls happen ```java public void disconnect(Node node) { synchronized (this) { if(disconnecting){ return; } disconnecting=true; failUnsentRequests(node, DisconnectException.INSTANCE); client.disconnect(node.idString()); } // We need to poll to ensure callbacks from in-flight requests on the disconnected socket are fired pollNoWakeup(); } ``` *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* No test to implement (I think this is a trivial fix) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread > -- > > Key: KAFKA-6541 > URL: https://issues.apache.org/jira/browse/KAFKA-6541 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Anh Le >Priority: Major > > There's something wrong with our client library when sending heart beats. > This bug seems to be identical to this one: > [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] > > Here's the log: > > {quote}2018-02-08 13:55:01,102 ERROR > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread > Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | > default-group': > java.lang.StackOverflowError: null > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233) > at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398) > at ch.qos.logback.classic.Logger.info(Logger.java:583) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at >
[jira] [Updated] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anh Le updated KAFKA-6541: -- Description: There's something wrong with our client library when sending heart beats. This bug seems to be identical to this one: [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] Here's the log: {quote}2018-02-08 13:55:01,102 ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | default-group': java.lang.StackOverflowError: null at java.lang.StringBuilder.append(StringBuilder.java:136) at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302) at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173) at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398) at ch.qos.logback.classic.Logger.info(Logger.java:583) at org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) at
[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356756#comment-16356756 ] huxihx commented on KAFKA-6541: --- Seems `coordinatorDead` was recursively called which could be proved by the repeating pattern of line numbers (797 -> 653 -> 388 -> 416 -> ... -> 797 -> 653 -> ...) > StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread > -- > > Key: KAFKA-6541 > URL: https://issues.apache.org/jira/browse/KAFKA-6541 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Anh Le >Priority: Major > > There's something wrong with our client library when sending heart beats. > This bug seems to be identical to this one: > [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] > > Here's the log: > > {{2018-02-08 13:55:01,102 ERROR > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread > Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | > default-group':}} > {{java.lang.StackOverflowError: null}} > {{ at java.lang.StringBuilder.append(StringBuilder.java:136)}} > {{ at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)}} > {{ at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)}} > {{ at > org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)}} > {{ at > org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)}} > {{ at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)}} > {{ at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)}} > {{ at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)}} > {{ at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)}} > {{ at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)}} > {{ at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)}} > {{ at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)}} > {{ at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)}} > {{ at > ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)}} > {{ at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)}} > {{ at ch.qos.logback.classic.Logger.info(Logger.java:583)}} > {{ at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)}} > {{ at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)}} > {{ at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}} > {{ at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}} > {{ at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}} > {{ at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}} > {{ at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}} > {{ at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}} > {{ at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)}} > {{ at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)}} > {{ at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)}} > {{ at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}} > {{ at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}} > {{ at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}} > {{ at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}} > {{ at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}} > {{ at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}} > {{ at >
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. Performance improvement benchmark: if we perform the steps above with topic a0 having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the controller ~4 minutes for to controller to go through controlled shutdown, detect the broker failure through zookeeper, and transition all replicas to OfflineReplica state. After applying the patch, the same process takes 23 seconds. The testing done: After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. Performance improvement benchmark: if we perform the steps above with topic a0 having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the controller ~4 minutes for to controller to go through controlled shutdown, detect the broker failure through zookeeper, and transition all replicas to OfflineReplica state. After applying the patch, the same process takes 23 seconds. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've