[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357817#comment-16357817
 ] 

Gwen Shapira edited comment on KAFKA-2967 at 2/9/18 1:45 AM:
-

Having used both, I have to say that I like Asciidoc's version of markdown is 
much easier to use. The available plugins (especially for Atom) are better too.

[~vikgamov] is a Gradle expert (or at least an official reviewer of O'Reilly 
Gradle book), and given our Gradle/doc integration pains in the past, it is 
good to have a community volunteer with the necessary expertise to maintain the 
integration.

How does everyone else feel on Asciidoc vs RST?

 (Sorry for bikeshedding, [~ewencp], but I think it is worthwhile to think a 
bit about a decision we'll have to live with for the coming years).

 


was (Author: gwenshap):
Having used both, I have to say that I like Asciidoc's version of markdown is 
much easier to use. The available plugins (especially for Atom) are better too.

[~vikgamov] is a Gradle expert (or at least an official reviewer of O'Reilly 
Gradle book), and given our Gradle/doc integration pains in the past, it is 
good to have a community volunteer with the necessary expertise to maintain the 
integration.

How does everyone else feel on Asciidoc vs RST?

 

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357817#comment-16357817
 ] 

Gwen Shapira commented on KAFKA-2967:
-

Having used both, I have to say that I like Asciidoc's version of markdown is 
much easier to use. The available plugins (especially for Atom) are better too.

[~vikgamov] is a Gradle expert (or at least an official reviewer of O'Reilly 
Gradle book), and given our Gradle/doc integration pains in the past, it is 
good to have a community volunteer with the necessary expertise to maintain the 
integration.

How does everyone else feel on Asciidoc vs RST?

 

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6547) group offset reset and begin_offset ignored/no effect

2018-02-08 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357807#comment-16357807
 ] 

Jeff Widman commented on KAFKA-6547:


What is the specific command you are running?

How do you know there is no effect? Simply because the consumer is later unable 
to poll?

How do you know it's not a broken consumer?

> group offset reset and begin_offset ignored/no effect
> -
>
> Key: KAFKA-6547
> URL: https://issues.apache.org/jira/browse/KAFKA-6547
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: ubuntu 16, java 1.8
>Reporter: Dan
>Priority: Major
> Fix For: 0.11.0.2
>
>
> Use of kafka-consumer-group.sh with --reset-offsets --execute  <--to-earliest 
> or anything> has no effect in 1.0. When my group client connects and requests 
> a specific offset or an earliest there's no effect and the consumer is unable 
> to poll, so no messages, even new ones are ignored.
> I installed 0.11 and these problems are not manifest.
> I'm unfamiliar with the internals and put the offset manager as the possible 
> component, but that's a guess.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-08 Thread Vik Gamov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357798#comment-16357798
 ] 

Vik Gamov edited comment on KAFKA-2967 at 2/9/18 1:27 AM:
--

Guys,

Have we ever consider to use asciidoc 
[http://asciidoctor.org/docs/what-is-asciidoc/]  for writing documentation 
rather rst or markdown?

There are bunch of known projects who use asciidoc 
[http://asciidoctor.org/docs/what-is-asciidoc/#who-s-using-asciidoc] (like GIT 
documentation and Neo4j database docs, github supports it)

It's very powerful and suitable for writing complex text (like books)

Oreilly uses it for book writers, there are plugins for gradle to generate 
html5, pdf, and even mobile optimized formats like epub and mobi.

It has wide range integrations (including gradle and maven) 
[http://asciidoctor.org/docs/#references-and-developer-resources] 

Here is how it different than markdown (for example) 
[http://asciidoctor.org/docs/user-manual/#compared-to-markdown].

Thank you


was (Author: vikgamov):
Guys,

Have we ever consider to use asciidoc 
[http://asciidoctor.org/docs/what-is-asciidoc/]  for writing documentation 
rather rst or markdown?

It's very powerful and suitable for writing complex text (like books)

Oreilly uses it for book writers, there are plugins for gradle to generate 
html5, pdf, and even mobile optimized formats like epub and mobi.

It has wide range integrations (including gradle and maven) 
[http://asciidoctor.org/docs/#references-and-developer-resources] 

Here is how it different than markdown (for example) 
http://asciidoctor.org/docs/user-manual/#compared-to-markdown.

Thank you

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-08 Thread Vik Gamov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357798#comment-16357798
 ] 

Vik Gamov commented on KAFKA-2967:
--

Guys,

Have we ever consider to use asciidoc 
[http://asciidoctor.org/docs/what-is-asciidoc/]  for writing documentation 
rather rst or markdown?

It's very powerful and suitable for writing complex text (like books)

Oreilly uses it for book writers, there are plugins for gradle to generate 
html5, pdf, and even mobile optimized formats like epub and mobi.

It has wide range integrations (including gradle and maven) 
[http://asciidoctor.org/docs/#references-and-developer-resources] 

Here is how it different than markdown (for example) 
http://asciidoctor.org/docs/user-manual/#compared-to-markdown.

Thank you

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize

2018-02-08 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6501.

   Resolution: Fixed
Fix Version/s: (was: 1.2.0)
   1.1.0

> Add test to verify markPartitionsForTruncation after fetcher thread pool 
> resize 
> 
>
> Key: KAFKA-6501
> URL: https://issues.apache.org/jira/browse/KAFKA-6501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> Follow-on task from KAFKA-6242



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357749#comment-16357749
 ] 

ASF GitHub Bot commented on KAFKA-6501:
---

hachikuji closed pull request #4539: KAFKA-6501: Dynamic broker config tests 
updates and metrics fix
URL: https://github.com/apache/kafka/pull/4539
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 144632c6c5d..8a17528bfb7 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,6 @@ import java.util.concurrent._
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, 
NoOpAction, CloseConnectionAction}
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
@@ -40,6 +39,10 @@ import scala.reflect.ClassTag
 object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
+  val RequestQueueSizeMetric = "RequestQueueSize"
+  val ResponseQueueSizeMetric = "ResponseQueueSize"
+  val ProcessorMetricTag = "processor"
+
   def isRequestLoggingEnabled: Boolean = 
requestLogger.underlying.isDebugEnabled
 
   sealed trait BaseRequest
@@ -241,15 +244,16 @@ object RequestChannel extends Logging {
 }
 
 class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+  import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge("RequestQueueSize", new Gauge[Int] {
+  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
   def value = requestQueue.size
   })
 
-  newGauge("ResponseQueueSize", new Gauge[Int]{
+  newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
 def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
   total + processor.responseQueueSize
 }
@@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends 
KafkaMetricsGroup {
   def addProcessor(processor: Processor): Unit = {
 if (processors.putIfAbsent(processor.id, processor) != null)
   warn(s"Unexpected processor with processorId ${processor.id}")
+
+newGauge(ResponseQueueSizeMetric,
+  new Gauge[Int] {
+def value = processor.responseQueueSize
+  },
+  Map(ProcessorMetricTag -> processor.id.toString)
+)
   }
 
   def removeProcessor(processorId: Int): Unit = {
 processors.remove(processorId)
+removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> 
processorId.toString))
   }
 
   /** Send a request to be handled, potentially blocking until there is room 
in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index fef412b7198..d37b5231594 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
 }
 
+private[kafka] object Processor {
+  val IdlePercentMetricName = "IdlePercent"
+  val NetworkProcessorMetricTag = "networkProcessor"
+  val ListenerMetricTag = "listener"
+}
+
 /**
  * Thread that processes all requests from a single connection. There are N of 
these running in parallel
  * each of which has its own selector
@@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int,
memoryPool: MemoryPool,
logContext: LogContext) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
+  import Processor._
   private object ConnectionId {
 def fromString(s: String): Option[ConnectionId] = s.split("-") match {
   case Array(local, remote, index) => 
BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
@@ -471,18 +478,11 @@ private[kafka] class Processor(val id: Int,
   private val responseQueue = new 
LinkedBlockingDeque[RequestChannel.Response]()
 
   private[kafka] val metricTags = mutable.LinkedHashMap(
-"listener" -> listenerName.value,
-"networkProcessor" -> id.toString
+ListenerMetricTag -> listenerName.value,
+NetworkProcessorMetricTag -> id.toString
   ).asJava
 
-  newGauge("ResponseQueueSize",
-new Gauge[Int] {
-  def value = responseQueue.size()
-},
-

[jira] [Commented] (KAFKA-6208) Reduce startup time for Kafka Connect workers

2018-02-08 Thread Satyajit varma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357745#comment-16357745
 ] 

Satyajit varma commented on KAFKA-6208:
---

[~rhauch] , we could make the connectors,SMT's and converters load 
asynchronously, like below,

{code}

CompletableFuture.runAsync(()->{
 addPlugins(plugins.connectors(), loader);
 connectors.addAll(plugins.connectors());
}, Executors.newSingleThreadExecutor());


CompletableFuture.runAsync(()->{
 addPlugins(plugins.converters(), loader);
 converters.addAll(plugins.converters());
}, Executors.newSingleThreadExecutor());


CompletableFuture.runAsync(()->{
 addPlugins(plugins.transformations(), loader);
 transformations.addAll(plugins.transformations());
}, Executors.newSingleThreadExecutor());

{code}

> Reduce startup time for Kafka Connect workers
> -
>
> Key: KAFKA-6208
> URL: https://issues.apache.org/jira/browse/KAFKA-6208
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Major
>
> Kafka Connect startup times are excessive with a handful of connectors on the 
> plugin path or classpath. We should not be scanning three times (once for 
> connectors, once for SMTs, and once for converters), and hopefully we can 
> avoid scanning directories that are clearly not plugin directories. 
> We should also consider using Java's Service Loader to quickly identify 
> connectors. The latter would require a KIP and would require time to for 
> connectors to migrate, but we could be smarter about only scanning plugin 
> directories that need to be scanned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread Paul Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357331#comment-16357331
 ] 

Paul Davidson edited comment on KAFKA-6388 at 2/9/18 12:03 AM:
---

[~hachikuji] We saw this on non-compacted topics.


was (Author: pdavidson):
[~dhay]  We saw this on non-compacted topics.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> 

[jira] [Created] (KAFKA-6548) Migrate committed offsets from ZooKeeper to Kafka

2018-02-08 Thread Manikandan P (JIRA)
Manikandan P created KAFKA-6548:
---

 Summary: Migrate committed offsets from ZooKeeper to Kafka
 Key: KAFKA-6548
 URL: https://issues.apache.org/jira/browse/KAFKA-6548
 Project: Kafka
  Issue Type: Improvement
  Components: offset manager
Affects Versions: 0.10.0.0
 Environment: Windows
Reporter: Manikandan P


We were using previous version of Kafka(0.8.X) where all the offset details 
were stored in ZooKeeper. 
Now we moved to new version of Kafka(0.10.X) where all the Topic offset details 
are stored in Kafka itself. 
We have to move all the Topic offset details to ZooKeeper to Kafka for existing 
application in Production.
Kafka is installed in Windows machine. we can't run kafka-consumer-groups.sh 
from windows.
Please advice how to migrate committed offsets from ZooKeeper to Kafka.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6547) group offset reset and begin_offset ignored/no effect

2018-02-08 Thread Dan (JIRA)
Dan created KAFKA-6547:
--

 Summary: group offset reset and begin_offset ignored/no effect
 Key: KAFKA-6547
 URL: https://issues.apache.org/jira/browse/KAFKA-6547
 Project: Kafka
  Issue Type: Bug
  Components: offset manager
Affects Versions: 1.0.0
 Environment: ubuntu 16, java 1.8
Reporter: Dan
 Fix For: 0.11.0.2


Use of kafka-consumer-group.sh with --reset-offsets --execute  <--to-earliest 
or anything> has no effect in 1.0. When my group client connects and requests a 
specific offset or an earliest there's no effect and the consumer is unable to 
poll, so no messages, even new ones are ignored.

I installed 0.11 and these problems are not manifest.

I'm unfamiliar with the internals and put the offset manager as the possible 
component, but that's a guess.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener

2018-02-08 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6546:
-

 Summary: Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing 
listener
 Key: KAFKA-6546
 URL: https://issues.apache.org/jira/browse/KAFKA-6546
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 1.2.0


In 1,1, if an endpoint is available on the broker processing a metadata 
request, but the corresponding listener is not available on the leader of a 
partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned 
UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some 
brokers are not configured with all listeners or it could indicate a transient 
error when listeners are dynamically added, We want to treat the error as a 
transient error to process dynamic updates, but we should notify clients of the 
actual error. This change should be made when MetadataRequest version is 
updated so that LEADER_NOT_AVAILABLE is returned to older clients.

See 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
 and  [https://github.com/apache/kafka/pull/4539] for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357626#comment-16357626
 ] 

Ismael Juma commented on KAFKA-6529:


[~damianguy] Yes, I think the PR should be merged today or tomorrow. 
[~rsivaram] and [~hachikuji] are on it.

> Broker leaks memory and file descriptors after sudden client disconnects
> 
>
> Key: KAFKA-6529
> URL: https://issues.apache.org/jira/browse/KAFKA-6529
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Graham Campbell
>Priority: Major
> Fix For: 1.1.0, 0.11.0.3, 1.0.2
>
>
> If a producer forcefully disconnects from a broker while it has staged 
> receives, that connection enters a limbo state where it is no longer 
> processed by the SocketServer.Processor, leaking the file descriptor for the 
> socket and the memory used for the staged recieve queue for that connection.
> We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
> the rolling restart to upgrade, open file descriptors on the brokers started 
> climbing uncontrollably. In a few cases brokers reached our configured max 
> open files limit of 100k and crashed before we rolled back.
> We tracked this down to a buildup of muted connections in the 
> Selector.closingChannels list. If a client disconnects from the broker with 
> multiple pending produce requests, when the broker attempts to send an ack to 
> the client it recieves an IOException because the TCP socket has been closed. 
> This triggers the Selector to close the channel, but because it still has 
> pending requests, it adds it to Selector.closingChannels to process those 
> requests. However, because that exception was triggered by trying to send a 
> response, the SocketServer.Processor has marked the channel as muted and will 
> no longer process it at all.
> *Reproduced by:*
> Starting a Kafka broker/cluster
> Client produces several messages and then disconnects abruptly (eg. 
> _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
> Broker then leaks file descriptor previously used for TCP socket and memory 
> for unprocessed messages
> *Proposed solution (which we've implemented internally)*
> Whenever an exception is encountered when writing to a socket in 
> Selector.pollSelectionKeys(...) record that that connection failed a send by 
> adding the KafkaChannel ID to Selector.failedSends. Then re-raise the 
> exception to still trigger the socket disconnection logic. Since every 
> exception raised in this function triggers a disconnect, we also treat any 
> exception while writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0

2018-02-08 Thread Chong Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357615#comment-16357615
 ] 

Chong Wang commented on KAFKA-6545:
---

[~ijuma] Thanks, I was thinking it's AdminClient.scala.

> AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
> -
>
> Key: KAFKA-6545
> URL: https://issues.apache.org/jira/browse/KAFKA-6545
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Priority: Major
>
> Not sure why this function is deleted 
> ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)]
> Our code was relying on it.
> {quote}final MetadataResponse.TopicMetadata topicMetadata = 
> AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
>  final Topic topic = new Topic();
>  topic.setPartitions(topicMetadata.partitionMetadata().size());
>  final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e 
> -> e.replicas().size()).sum();
>  topic.setReplications(replicas);
> {quote}
> Is there any alternatives?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0

2018-02-08 Thread Chong Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chong Wang resolved KAFKA-6545.
---
Resolution: Not A Problem

> AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
> -
>
> Key: KAFKA-6545
> URL: https://issues.apache.org/jira/browse/KAFKA-6545
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Priority: Major
>
> Not sure why this function is deleted 
> ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)]
> Our code was relying on it.
> {quote}final MetadataResponse.TopicMetadata topicMetadata = 
> AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
>  final Topic topic = new Topic();
>  topic.setPartitions(topicMetadata.partitionMetadata().size());
>  final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e 
> -> e.replicas().size()).sum();
>  topic.setReplications(replicas);
> {quote}
> Is there any alternatives?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-02-08 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6538:
---
Affects Version/s: (was: 1.2.0)
   1.1.0

> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357588#comment-16357588
 ] 

David Hay commented on KAFKA-6388:
--

I'm pretty sure we're not using compacted topics.  It's whatever is the default 
out of the box.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
> ... 13 more
> 

[jira] [Commented] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0

2018-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357577#comment-16357577
 ] 

Ismael Juma commented on KAFKA-6545:


The commit description states why they were removed (they were internal APIs 
with no tests). The alternative is to use AdminClient (a public and supported 
API): 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java

> AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
> -
>
> Key: KAFKA-6545
> URL: https://issues.apache.org/jira/browse/KAFKA-6545
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Priority: Major
>
> Not sure why this function is deleted 
> ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)]
> Our code was relying on it.
> {quote}final MetadataResponse.TopicMetadata topicMetadata = 
> AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
>  final Topic topic = new Topic();
>  topic.setPartitions(topicMetadata.partitionMetadata().size());
>  final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e 
> -> e.replicas().size()).sum();
>  topic.setReplications(replicas);
> {quote}
> Is there any alternatives?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0

2018-02-08 Thread Chong Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chong Wang updated KAFKA-6545:
--
Description: 
Not sure why this function is deleted 
([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)]

Our code was relying on it.
{quote}final MetadataResponse.TopicMetadata topicMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
 final Topic topic = new Topic();
 topic.setPartitions(topicMetadata.partitionMetadata().size());
 final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> 
e.replicas().size()).sum();
 topic.setReplications(replicas);
{quote}
Is there any alternatives?

  was:
Not sure why this function is deleted, our code was relying on it.

final MetadataResponse.TopicMetadata topicMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
final Topic topic = new Topic();
topic.setPartitions(topicMetadata.partitionMetadata().size());
final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> 
e.replicas().size()).sum();
topic.setReplications(replicas);


> AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0
> -
>
> Key: KAFKA-6545
> URL: https://issues.apache.org/jira/browse/KAFKA-6545
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Priority: Major
>
> Not sure why this function is deleted 
> ([https://github.com/apache/kafka/commit/1d24e10aeab616eede416201336e928b9a8efa98#diff-5d4a4a97554d3ac24efe68d98eb27b64)]
> Our code was relying on it.
> {quote}final MetadataResponse.TopicMetadata topicMetadata = 
> AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
>  final Topic topic = new Topic();
>  topic.setPartitions(topicMetadata.partitionMetadata().size());
>  final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e 
> -> e.replicas().size()).sum();
>  topic.setReplications(replicas);
> {quote}
> Is there any alternatives?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6545) AdminUtils.fetchTopicMetadataFromZk is not available in 1.0.0

2018-02-08 Thread Chong Wang (JIRA)
Chong Wang created KAFKA-6545:
-

 Summary: AdminUtils.fetchTopicMetadataFromZk is not available in 
1.0.0
 Key: KAFKA-6545
 URL: https://issues.apache.org/jira/browse/KAFKA-6545
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.0.0
Reporter: Chong Wang


Not sure why this function is deleted, our code was relying on it.

final MetadataResponse.TopicMetadata topicMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
final Topic topic = new Topic();
topic.setPartitions(topicMetadata.partitionMetadata().size());
final int replicas = topicMetadata.partitionMetadata().stream().mapToInt(e -> 
e.replicas().size()).sum();
topic.setReplications(replicas);



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"

2018-02-08 Thread Yu Yang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-6544:
---
Description: 
Our kafka cluster encountered a few disk/xfs failures in the cloud vm 
instances. When a disk/xfs failure happens, kafka process did not exit 
gracefully. Instead, it ran into  "" status, with port 9092 still be 
reachable.  when failures like this happens, kafka should shutdown all threads 
and exit. The following is the kafka logs when the failure happens:

{code:java}
[2018-02-08 12:52:31,764] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:340)
at kafka.network.Acceptor.run(SocketServer.scala:283)
at java.lang.Thread.run(Thread.java:748)
[2018-02-08 12:52:31,772] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:340)
at kafka.network.Acceptor.run(SocketServer.scala:283)
at java.lang.Thread.run(Thread.java:748)
[2018-02-08 12:52:31,772] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:340)
at kafka.network.Acceptor.run(SocketServer.scala:283)
at java.lang.Thread.run(Thread.java:748)
 {code}

  was:
Our kafka cluster encountered a few disk/xfs failures in the cloud vm 
instances. When a disk/xfs failure happens, kafka process did not exit 
gracefully. Instead, it run into  "" status, with port 9092 still be 
reachable.  when failures like this happens, kafka should shutdown all threads 
and exit. The following is the kafka logs when the failure happens:

{code:java}
[2018-02-08 12:52:31,764] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:340)
at kafka.network.Acceptor.run(SocketServer.scala:283)
at java.lang.Thread.run(Thread.java:748)
[2018-02-08 12:52:31,772] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:340)
at kafka.network.Acceptor.run(SocketServer.scala:283)
at java.lang.Thread.run(Thread.java:748)
[2018-02-08 12:52:31,772] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:340)
at kafka.network.Acceptor.run(SocketServer.scala:283)
at java.lang.Thread.run(Thread.java:748)
 {code}


> kafka process should exit when it encounters "java.io.IOException: Too many 
> open files"  
> -
>
> Key: KAFKA-6544
> URL: https://issues.apache.org/jira/browse/KAFKA-6544
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, network
>Affects Versions: 0.10.2.1
>Reporter: Yu Yang
>Priority: Major
>
> Our kafka cluster encountered a few disk/xfs failures in the cloud vm 
> instances. When a disk/xfs failure happens, kafka process did not exit 
> gracefully. Instead, it ran into  "" status, with port 9092 still be 
> 

[jira] [Commented] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"

2018-02-08 Thread Yu Yang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357483#comment-16357483
 ] 

Yu Yang commented on KAFKA-6544:


[~cmccabe]  The kafka process is in `` status.  sudo ls -l 
/proc/$kafka_pid/fd returns 0.   I am also including  "netstat -pnt" output 
here. Connections are either in ESTABLISHED or CLOSE_WAIT status. 

{code}
proc/30413/fd]# sudo ls -l /proc/30413/fd
total 0
{code} 

{code}
netstat -pnt | grep "10.1.160.124:9092" | wc
116 812   11252
{code} 


{code}
netstat -pnt | grep "10.1.160.124:9092"
tcp   29  0 10.1.160.124:9092   10.1.25.241:55616   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:58624   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.9.121:33894CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:53886   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:43122   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:50766   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.26.165:34282   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.79.149:47682   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.163.135:44008  CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.66.116:52398   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.64.116:36656   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.207.247:51904  CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.9.16:45942 CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.131.15:57118   CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:55974   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.214.5:33040CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:33494   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.201.139:60230  CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.207.247:51792  CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:42858   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:44246   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.194.26:42406   CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:32902   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.169.94:35532   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.193.101:48832  CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.204.225:60946  CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:35772   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:46972   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:56226   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:46432   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:44436   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:4   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:47364   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:44908   ESTABLISHED 
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:43060   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.10.15:39282CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.181.86:55500   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.17.191:32812   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.141.30:52024   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.76.141:51366   CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:50940   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   10.1.11.196:44064   CLOSE_WAIT  
-   
tcp   65  0 10.1.160.124:9092   10.1.143.107:37116  CLOSE_WAIT  
-   
tcp   29  0 10.1.160.124:9092   10.1.25.241:37416   ESTABLISHED 
-   
tcp   65  0 10.1.160.124:9092   

[jira] [Updated] (KAFKA-6390) Update ZooKeeper to 3.4.11, Gradle and other minor updates

2018-02-08 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6390:
---
Fix Version/s: (was: 2.0.0)
   1.1.0

> Update ZooKeeper to 3.4.11, Gradle and other minor updates
> --
>
> Key: KAFKA-6390
> URL: https://issues.apache.org/jira/browse/KAFKA-6390
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 1.1.0
>
>
> https://issues.apache.org/jira/browse/ZOOKEEPER-2614 is a helpful fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6345:

Fix Version/s: 1.1.0

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
> Fix For: 1.1.0
>
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt resolved KAFKA-6345.
-
Resolution: Fixed

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
> Fix For: 1.1.0
>
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt reassigned KAFKA-6345:
---

Assignee: Sean McCauliff

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"

2018-02-08 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357456#comment-16357456
 ] 

Colin P. McCabe commented on KAFKA-6544:


What are the open file descriptors that you are seeing?  Can you get the output 
of {{sudo ls -l /proc/$KAFKA_PID/fd}} (where KAFKA_PID is the process ID of 
Kafka?)  And also perhaps {{netstat -pnt}}?

> kafka process should exit when it encounters "java.io.IOException: Too many 
> open files"  
> -
>
> Key: KAFKA-6544
> URL: https://issues.apache.org/jira/browse/KAFKA-6544
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, network
>Affects Versions: 0.10.2.1
>Reporter: Yu Yang
>Priority: Major
>
> Our kafka cluster encountered a few disk/xfs failures in the cloud vm 
> instances. When a disk/xfs failure happens, kafka process did not exit 
> gracefully. Instead, it run into  "" status, with port 9092 still be 
> reachable.  when failures like this happens, kafka should shutdown all 
> threads and exit. The following is the kafka logs when the failure happens:
> {code:java}
> [2018-02-08 12:52:31,764] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:748)
> [2018-02-08 12:52:31,772] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:748)
> [2018-02-08 12:52:31,772] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:748)
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6544) kafka process should exit when it encounters "java.io.IOException: Too many open files"

2018-02-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357419#comment-16357419
 ] 

Ted Yu commented on KAFKA-6544:
---

One option is to shutdown the process in SocketServer :
{code}
  case e: ControlThrowable => throw e
  case e: Throwable => error("Error occurred", e)
{code}
upon seeing IOException with the proper message.

> kafka process should exit when it encounters "java.io.IOException: Too many 
> open files"  
> -
>
> Key: KAFKA-6544
> URL: https://issues.apache.org/jira/browse/KAFKA-6544
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, network
>Affects Versions: 0.10.2.1
>Reporter: Yu Yang
>Priority: Major
>
> Our kafka cluster encountered a few disk/xfs failures in the cloud vm 
> instances. When a disk/xfs failure happens, kafka process did not exit 
> gracefully. Instead, it run into  "" status, with port 9092 still be 
> reachable.  when failures like this happens, kafka should shutdown all 
> threads and exit. The following is the kafka logs when the failure happens:
> {code:java}
> [2018-02-08 12:52:31,764] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:748)
> [2018-02-08 12:52:31,772] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:748)
> [2018-02-08 12:52:31,772] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:748)
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread Graham Campbell (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357401#comment-16357401
 ] 

Graham Campbell commented on KAFKA-6388:


Most of the recent times we've run into this it's been on non-compacted topics 
that have been idle for a while (no data in for > retention.ms) and then begin 
receiving data again. It's not happening to every replica, with 15 partitions 
and 3 replicas sometimes one or two followers will encounter this for a given 
topic.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> 

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread Paul Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357331#comment-16357331
 ] 

Paul Davidson commented on KAFKA-6388:
--

[~dhay]  We saw this on non-compacted topics.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
> ... 13 more
> [2017-12-19 15:16:24,302] INFO 

[jira] [Resolved] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-08 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6362.

   Resolution: Fixed
Fix Version/s: 1.1.0

> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>Assignee: huxihx
>Priority: Major
> Fix For: 1.1.0
>
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357298#comment-16357298
 ] 

ASF GitHub Bot commented on KAFKA-6362:
---

hachikuji closed pull request #4326: KAFKA-6362: maybeAutoCommitOffsetsAsync 
should try to discover coordinator
URL: https://github.com/apache/kafka/pull/4326
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d84f847cd8..2f7fd58a66f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1058,7 +1058,7 @@ public void assign(Collection partitions) 
{
 
 // make sure the offsets of topic partitions the consumer is 
unsubscribing from
 // are committed since there will be no following rebalance
-this.coordinator.maybeAutoCommitOffsetsNow();
+
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
 
 log.debug("Subscribed to partition(s): {}", 
Utils.join(partitions, ", "));
 this.subscriptions.assignFromUser(new HashSet<>(partitions));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5c1e60eee82..d7c1ce9966e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -528,6 +528,7 @@ public void commitOffsetsAsync(final Map offs
 public void onSuccess(Void value) {
 pendingAsyncCommits.decrementAndGet();
 doCommitOffsetsAsync(offsets, callback);
+client.pollNoWakeup();
 }
 
 @Override
@@ -623,20 +624,10 @@ public boolean commitOffsetsSync(Map offsets,
 return false;
 }
 
-private void maybeAutoCommitOffsetsAsync(long now) {
-if (autoCommitEnabled) {
-if (coordinatorUnknown()) {
-this.nextAutoCommitDeadline = now + retryBackoffMs;
-} else if (now >= nextAutoCommitDeadline) {
-this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
-doAutoCommitOffsetsAsync();
-}
-}
-}
-
-public void maybeAutoCommitOffsetsNow() {
-if (autoCommitEnabled && !coordinatorUnknown())
+public void maybeAutoCommitOffsetsAsync(long now) {
+if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
 doAutoCommitOffsetsAsync();
+}
 }
 
 private void doAutoCommitOffsetsAsync() {
@@ -650,8 +641,11 @@ public void onComplete(Map offsets, Exception
 log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, exception.getMessage());
 if (exception instanceof RetriableException)
 nextAutoCommitDeadline = Math.min(time.milliseconds() 
+ retryBackoffMs, nextAutoCommitDeadline);
+else
+nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
 } else {
 log.debug("Completed asynchronous auto-commit of offsets 
{}", offsets);
+nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
 }
 }
 });
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 76301a71bea..c49339b6525 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1625,6 +1625,25 @@ public void testHeartbeatThreadClose() throws Exception {
 assertFalse("Heartbeat thread active after close", 
threads[i].getName().contains(groupId));
 }
 
+@Test
+public void testAutoCommitAfterCoordinatorBackToService() {
+ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
+ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+

[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-08 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357279#comment-16357279
 ] 

Jason Gustafson commented on KAFKA-6541:


[~anhldbk] This was fixed in KAFKA-6366 which will be included in 1.0.1. The 
current RC is available here: 
[http://home.apache.org/~ewencp/kafka-1.0.1-rc0/]. If you have time to confirm 
the fix and close this JIRA, we would appreciate it.

> StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
> --
>
> Key: KAFKA-6541
> URL: https://issues.apache.org/jira/browse/KAFKA-6541
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Anh Le
>Priority: Major
>
> There's something wrong with our client library when sending heart beats. 
> This bug seems to be identical to this one: 
> [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]
>  
> Here's the log:
>  
> {quote}2018-02-08 13:55:01,102 ERROR 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread
>  Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
> default-group':
> java.lang.StackOverflowError: null
>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>  at 
> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)
>  at 
> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
>  at 
> ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
>  at 
> ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
>  at 
> ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
>  at 
> ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
>  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
>  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
>  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
>  at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)
>  at ch.qos.logback.classic.Logger.info(Logger.java:583)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> 

[jira] [Resolved] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables

2018-02-08 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6543.

Resolution: Duplicate

> Allow KTables to be bootstrapped at start up, like GKTables
> ---
>
> Key: KAFKA-6543
> URL: https://issues.apache.org/jira/browse/KAFKA-6543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> In some uses cases, it's desirable to have KTables "fully" bootstrapped (at 
> leas in best efforts) before the topology begins, similar to how a GKTable 
> does. This could prevent join race conditions for one, which could be a big 
> problem if local KTable state has been lost.
>  
> Related to KAFKA-6542 Tables should trigger joins too, not just streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper

2018-02-08 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357221#comment-16357221
 ] 

Nick Travers commented on KAFKA-4914:
-

[~damianguy] - noticed this got updated recently. I've had a PR open for for a 
while now (coming up on a year).

[~ijuma] has been helping me iterate on it. Would love to get that merged if 
possible!

> Partition re-assignment tool should check types before persisting state in 
> ZooKeeper
> 
>
> Key: KAFKA-4914
> URL: https://issues.apache.org/jira/browse/KAFKA-4914
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>Assignee: Nick Travers
>Priority: Major
> Fix For: 2.0.0
>
>
> The partition-reassignment too currently allows non-type-safe information to 
> be persisted into ZooKeeper, which can result in a ClassCastException at 
> runtime for brokers.
> Specifically, this occurred when the broker assignment field was a List of 
> Strings, instead of a List of Integers.
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
> at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357214#comment-16357214
 ] 

Andrew Olson commented on KAFKA-2435:
-

[~jeffwidman] yes, that sounds quite reasonable to me.

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357203#comment-16357203
 ] 

Jeff Widman edited comment on KAFKA-2435 at 2/8/18 4:54 PM:


Can this be closed as wontfix? 

Two reasons:
 # it targets the deprecated (removed?) high-level consumer
 # KIP-54 addressed some of the concerns here (although possibly not all, as 
mentioned here: 
https://issues.apache.org/jira/browse/KAFKA-3297?focusedCommentId=15788229=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15788229)


was (Author: jeffwidman):
Can this be closed as wontfix? 

Two reasons:
 # it targets the deprecated (removed?) high-level consumer
 # KIP-54 addressed some of the concerns here, and IMHO is a better solution 
because it addresses both fairness and affinity/stickyness

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357203#comment-16357203
 ] 

Jeff Widman edited comment on KAFKA-2435 at 2/8/18 4:51 PM:


Can this be closed as wontfix? 

Two reasons:

 # it targets the deprecated (removed?) high-level consumer

# KIP-54 addressed some of the concerns here, and IMHO is a better solution 
because it addresses both fairness and affinity/stickyness


was (Author: jeffwidman):
Can this be closed as wontfix? 

Two reasons:

 

1) it targets the deprecated (removed?) high-level consumer

2) KIP-54 addressed some of the concerns here, and IMHO is a better solution 
because it addresses both fairness and affinity/stickyness

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357203#comment-16357203
 ] 

Jeff Widman commented on KAFKA-2435:


Can this be closed as wontfix? 

Two reasons:

 

1) it targets the deprecated (removed?) high-level consumer

2) KIP-54 addressed some of the concerns here, and IMHO is a better solution 
because it addresses both fairness and affinity/stickyness

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-08 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6504.

   Resolution: Fixed
Fix Version/s: 1.1.0

> Connect: Some per-task-metrics not working
> --
>
> Key: KAFKA-6504
> URL: https://issues.apache.org/jira/browse/KAFKA-6504
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Assignee: Robert Yokota
>Priority: Minor
> Fix For: 1.1.0
>
>
> Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
> least it seems like MBean 
> "kafka.connect:type=source-task-metrics,connector=,task=x" 
> attribute "source-record-active-count" reports the same number for all x 
> tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a 
> source-connector "my-connector" with 2 tasks that both run in the same 
> Kafka-Connect instance, but I know that only one of them actually produces 
> anything (and therefore can have "active source-records") both 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes 
> up (following each other). It should only go up for the one task that 
> actually produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-08 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357015#comment-16357015
 ] 

Damian Guy commented on KAFKA-6529:
---

[~ijuma] should this go in to 1.1?

> Broker leaks memory and file descriptors after sudden client disconnects
> 
>
> Key: KAFKA-6529
> URL: https://issues.apache.org/jira/browse/KAFKA-6529
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Graham Campbell
>Priority: Major
> Fix For: 1.1.0, 0.11.0.3, 1.0.2
>
>
> If a producer forcefully disconnects from a broker while it has staged 
> receives, that connection enters a limbo state where it is no longer 
> processed by the SocketServer.Processor, leaking the file descriptor for the 
> socket and the memory used for the staged recieve queue for that connection.
> We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
> the rolling restart to upgrade, open file descriptors on the brokers started 
> climbing uncontrollably. In a few cases brokers reached our configured max 
> open files limit of 100k and crashed before we rolled back.
> We tracked this down to a buildup of muted connections in the 
> Selector.closingChannels list. If a client disconnects from the broker with 
> multiple pending produce requests, when the broker attempts to send an ack to 
> the client it recieves an IOException because the TCP socket has been closed. 
> This triggers the Selector to close the channel, but because it still has 
> pending requests, it adds it to Selector.closingChannels to process those 
> requests. However, because that exception was triggered by trying to send a 
> response, the SocketServer.Processor has marked the channel as muted and will 
> no longer process it at all.
> *Reproduced by:*
> Starting a Kafka broker/cluster
> Client produces several messages and then disconnects abruptly (eg. 
> _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
> Broker then leaks file descriptor previously used for TCP socket and memory 
> for unprocessed messages
> *Proposed solution (which we've implemented internally)*
> Whenever an exception is encountered when writing to a socket in 
> Selector.pollSelectionKeys(...) record that that connection failed a send by 
> adding the KafkaChannel ID to Selector.failedSends. Then re-raise the 
> exception to still trigger the socket disconnection logic. Since every 
> exception raised in this function triggers a disconnect, we also treat any 
> exception while writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5919) Adding checks on "version" field for tools using it

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5919:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Adding checks on "version" field for tools using it
> ---
>
> Key: KAFKA-5919
> URL: https://issues.apache.org/jira/browse/KAFKA-5919
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
> Fix For: 2.0.0
>
>
> Hi,
> the kafka-delete-records script allows user to pass information about records 
> to delete through a JSON file. Such file, as described in the command help, 
> is made by a "partitions" array and a "version" field. Reading 
> [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient]
>  and the DeleteRecords API (Key: 21) description it's not clear what such 
> field is and even it's not used at all (in the current implementation).
> It turned out that the field is for having backward compatibility in the 
> future where the JSON format could change. This JIRA is about adding more 
> checks on the "version" field having it not mandatory but assuming the 
> earliest version (current 1) if it's omitted from the JSON file.
> The same for the kafka-reassign-partitions which has a topics-to-move JSON 
> file as input (used with --generate option) and the partitions-to-move.json 
> (used with --execute option). In both cases the same logic can be applied as 
> above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6448) Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the annotation

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6448:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the 
> annotation
> ---
>
> Key: KAFKA-6448
> URL: https://issues.apache.org/jira/browse/KAFKA-6448
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Hongyuan Li
>Priority: Minor
> Fix For: 2.0.0
>
> Attachments: KAFKA-6448-1.patch, KAFKA-6448-2.patch
>
>
> In the annotation, it said 
> {code}*This feature must be enabled with -Dmx4jenable=true*{code}
> *which is not compatible with the code* 
> {code}
> **
> props.getBoolean("kafka_mx4jenable", false)
>  **
> {code}
> patch KAFKA-6448-1.patch modifies the code, and KAFKA-6448-2.patch modifies 
> the annotation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5532) Making bootstrap.servers property a first citizen option for the ProducerPerformance

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5532:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Making bootstrap.servers property a first citizen option for the 
> ProducerPerformance
> 
>
> Key: KAFKA-5532
> URL: https://issues.apache.org/jira/browse/KAFKA-5532
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Trivial
> Fix For: 2.0.0
>
>
> Hi,
> using the ProducerPerformance tool you have to specify the bootstrap.servers 
> option using the producer-props or producer-config option. It could be better 
> having bootstrap.servers as a first citizen option like all the other tools, 
> so a dedicate --bootstrap-servers option.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5517) Support linking to particular configuration parameters

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5517:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Support linking to particular configuration parameters
> --
>
> Key: KAFKA-5517
> URL: https://issues.apache.org/jira/browse/KAFKA-5517
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.0.0
>
>
> Currently the configuration parameters are documented long tables, and it's 
> only possible to link to the heading before a particular table. When 
> discussing configuration parameters on forums it would be helpful to be able 
> to link to the particular parameter under discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5479) Docs for authorization omit authorizer.class.name

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5479:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Docs for authorization omit authorizer.class.name
> -
>
> Key: KAFKA-5479
> URL: https://issues.apache.org/jira/browse/KAFKA-5479
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.0.0
>
>
> The documentation in §7.4 Authorization and ACLs doesn't mention the 
> {{authorizer.class.name}} setting. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5692:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: kip, patch-available
> Fix For: 2.0.0
>
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5359:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Exceptions from RequestFuture lack parts of the stack trace
> ---
>
> Key: KAFKA-5359
> URL: https://issues.apache.org/jira/browse/KAFKA-5359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Magnus Reftel
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.0.0
>
>
> When an exception occurs within a task that reports its result using a 
> RequestFuture, that exception is stored in a field on the RequestFuture using 
> the {{raise}} method. In many places in the code where such futures are 
> completed, that exception is then thrown directly using {{throw 
> future.exception();}} (see e.g. 
> [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]).
> This means that the exception that ends up in client code only has stack 
> traces related to the original exception, but nothing leading up to the 
> completion of the future. The client therefore gets no indication of what was 
> going on in the client code - only that it somehow ended up in the Kafka 
> libraries, and that a task failed at some point.
> One solution to this is to use the exceptions from the future as causes for 
> chained exceptions, so that the client gets a stack trace that shows what the 
> client was doing, in addition to getting the stack traces for the exception 
> in the task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4950:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Sébastien Launay
>Priority: Minor
> Fix For: 2.0.0, 1.0.2
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Map;
> import com.codahale.metrics.Gauge;
> import com.codahale.metrics.Metric;
> import com.codahale.metrics.MetricSet;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.MetricName;
> import static com.codahale.metrics.MetricRegistry.name;
> public class KafkaMetricSet implements MetricSet {
> private final KafkaConsumer client;
> public KafkaMetricSet(KafkaConsumer client) {
> this.client = client;
> }
> @Override
> public Map getMetrics() {
> final Map gauges = new HashMap();
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here 
> }
> });
> }
> return Collections.unmodifiableMap(gauges);
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4794:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 2.0.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4126) No relevant log when the topic is non-existent

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4126:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> No relevant log when the topic is non-existent
> --
>
> Key: KAFKA-4126
> URL: https://issues.apache.org/jira/browse/KAFKA-4126
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balázs Barnabás
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.0.0
>
>
> When a producer sends a ProducerRecord into a Kafka topic that doesn't 
> existst, there is no relevant debug/error log that points out the error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4893) async topic deletion conflicts with max topic length

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4893:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.0.0
>
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3999) Consumer bytes-fetched metric uses decompressed message size

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3999:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Consumer bytes-fetched metric uses decompressed message size
> 
>
> Key: KAFKA-3999
> URL: https://issues.apache.org/jira/browse/KAFKA-3999
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.0.0
>
>
> It looks like the computation for the bytes-fetched metrics uses the size of 
> the decompressed message set. I would have expected it to be based off of the 
> raw size of the fetch responses. Perhaps it would be helpful to expose both 
> the raw and decompressed fetch sizes? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4931) stop script fails due 4096 ps output limit

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4931:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> stop script fails due 4096 ps output limit
> --
>
> Key: KAFKA-4931
> URL: https://issues.apache.org/jira/browse/KAFKA-4931
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0
>Reporter: Amit Jain
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.0.0
>
>
> When run the script: bin/zookeeper-server-stop.sh fails to stop the zookeeper 
> server process if the ps output exceeds 4096 character limit of linux. I 
> think instead of ps we can use ${JAVA_HOME}/bin/jps -vl | grep QuorumPeerMain 
>  it would correctly stop zookeeper process. Currently we are using kill 
> PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
> '{print $1}')



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3733) Avoid long command lines by setting CLASSPATH in environment

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3733:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Avoid long command lines by setting CLASSPATH in environment
> 
>
> Key: KAFKA-3733
> URL: https://issues.apache.org/jira/browse/KAFKA-3733
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Adrian Muraru
>Assignee: Adrian Muraru
>Priority: Minor
> Fix For: 2.0.0
>
>
> {{kafka-run-class.sh}} sets the JVM classpath in the command line via {{-cp}}.
> This generates long command lines that gets trimmed by the shell in commands 
> like ps, pgrep,etc.
> An alternative is to set the CLASSPATH in environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3575) Use console consumer access topic that does not exist, can not use "Control + C" to exit process

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3575:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Use console consumer access topic that does not exist, can not use "Control + 
> C" to exit process
> 
>
> Key: KAFKA-3575
> URL: https://issues.apache.org/jira/browse/KAFKA-3575
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: SUSE Linux Enterprise Server 11 SP3
>Reporter: NieWang
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.0.0
>
>
> 1.  use "sh kafka-console-consumer.sh --zookeeper 10.252.23.133:2181 --topic 
> topic_02"  start console consumer. topic_02 does not exist.
> 2. you can not use "Control + C" to exit console consumer process. The 
> process is blocked.
> 3. use jstack check process stack, as follows:
> linux:~ # jstack 122967
> 2016-04-18 15:46:06
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):
> "Attach Listener" #29 daemon prio=9 os_prio=0 tid=0x01781800 
> nid=0x1e0c8 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "Thread-4" #27 prio=5 os_prio=0 tid=0x018a4000 nid=0x1e08a waiting on 
> condition [0x7ffbe5ac]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe00ed3b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101)
> "SIGINT handler" #28 daemon prio=9 os_prio=0 tid=0x019d5800 
> nid=0x1e089 in Object.wait() [0x7ffbe5bc1000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.$$YJP$$wait(Native Method)
> at java.lang.Object.wait(Object.java)
> at java.lang.Thread.join(Thread.java:1245)
> - locked <0xe71fd4e8> (a kafka.tools.ConsoleConsumer$$anon$1)
> at java.lang.Thread.join(Thread.java:1319)
> at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
> at 
> java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
> at java.lang.Shutdown.runHooks(Shutdown.java:123)
> at java.lang.Shutdown.sequence(Shutdown.java:167)
> at java.lang.Shutdown.exit(Shutdown.java:212)
> - locked <0xe00abfd8> (a java.lang.Class for 
> java.lang.Shutdown)
> at java.lang.Terminator$1.handle(Terminator.java:52)
> at sun.misc.Signal$1.run(Signal.java:212)
> at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-2" #20 daemon prio=5 os_prio=0 
> tid=0x7ffbec77a800 nid=0x1e079 waiting on condition [0x7ffbe66c8000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe6fa6438> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-1" #19 daemon prio=5 os_prio=0 
> tid=0x7ffbec783000 nid=0x1e078 waiting on condition [0x7ffbe67c9000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe6fa6438> (a 
> 

[jira] [Updated] (KAFKA-5914) Return MessageFormatVersion and MessageMaxBytes in MetadataResponse

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5914:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Return MessageFormatVersion and MessageMaxBytes in MetadataResponse
> ---
>
> Key: KAFKA-5914
> URL: https://issues.apache.org/jira/browse/KAFKA-5914
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 2.0.0
>
>
> As part of KIP-192, we want to send two additional fields in the 
> {{TopicMetadata}} which is part of the {{MetadataResponse}}. These fields are 
> the {{MessageFormatVersion}} and the {{MessageMaxBytes}}.
> The {{MessageFormatVersion}} is required to implement 
> https://issues.apache.org/jira/browse/KAFKA-5794 . The latter will be 
> implemented in a future release, but with the changes proposed here, the said 
> future release will be backward compatible with 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5952) Refactor Consumer Fetcher metrics

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5952:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Refactor Consumer Fetcher metrics
> -
>
> Key: KAFKA-5952
> URL: https://issues.apache.org/jira/browse/KAFKA-5952
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5951) Autogenerate Producer RecordAccumulator metrics

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5951:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Autogenerate Producer RecordAccumulator metrics
> ---
>
> Key: KAFKA-5951
> URL: https://issues.apache.org/jira/browse/KAFKA-5951
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6390) Update ZooKeeper to 3.4.11, Gradle and other minor updates

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-6390:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Update ZooKeeper to 3.4.11, Gradle and other minor updates
> --
>
> Key: KAFKA-6390
> URL: https://issues.apache.org/jira/browse/KAFKA-6390
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.0.0
>
>
> https://issues.apache.org/jira/browse/ZOOKEEPER-2614 is a helpful fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5944) Add unit tests for handling of authentication failures in clients

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5944:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Add unit tests for handling of authentication failures in clients
> -
>
> Key: KAFKA-5944
> URL: https://issues.apache.org/jira/browse/KAFKA-5944
> Project: Kafka
>  Issue Type: Test
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.0.0
>
>
> KAFKA-5854 improves authentication failures in clients and has added 
> integration tests and some basic client-side tests that create actual 
> connections to a mock server. It will be good to add a set of tests for 
> producers, consumers etc. that use MockClient to add more extensive tests for 
> various scenarios.
> cc [~hachikuji] [~vahid]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5886:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
> Fix For: 2.0.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5637) Document compatibility and release policies

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5637:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Document compatibility and release policies
> ---
>
> Key: KAFKA-5637
> URL: https://issues.apache.org/jira/browse/KAFKA-5637
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Ismael Juma
>Assignee: Sönke Liebau
>Priority: Major
> Fix For: 2.0.0
>
>
> We should document our compatibility and release policies in one place so 
> that people have the correct expectations. This is generally important, but 
> more so now that we are releasing 1.0.0.
> I extracted the following topics from the mailing list thread as the ones 
> that should be documented as a minimum: 
> *Code stability*
> * Explanation of stability annotations and their implications
> * Explanation of what public apis are
> * *Discussion point: * Do we want to keep the _unstable_ annotation or is 
> _evolving_ sufficient going forward?
> *Support duration*
> * How long are versions supported?
> * How far are bugfixes backported?
> * How far are security fixes backported?
> * How long are protocol versions supported by subsequent code versions?
> * How long are older clients supported?
> * How long are older brokers supported?
> I will create an initial pull request to add a section to the documentation 
> as basis for further discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5445:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Document exceptions thrown by AdminClient methods
> -
>
> Key: KAFKA-5445
> URL: https://issues.apache.org/jira/browse/KAFKA-5445
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients
>Reporter: Ismael Juma
>Assignee: Andrey Dyachkov
>Priority: Major
> Fix For: 2.0.0, 1.0.2
>
>
> AdminClient should document the exceptions that users may have to handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5403) Transactions system test should dedup consumed messages by offset

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5403:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Transactions system test should dedup consumed messages by offset
> -
>
> Key: KAFKA-5403
> URL: https://issues.apache.org/jira/browse/KAFKA-5403
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 2.0.0
>
>
> In KAFKA-5396, we saw that the consumers which verify the data in multiple 
> topics could read the same offsets multiple times, for instance when a 
> rebalance happens. 
> This would detect spurious duplicates, causing the test to fail. We should 
> dedup the consumed messages by offset and only fail the test if we have 
> duplicate values for a if for a unique set of offsets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4701:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Allow kafka brokers to dynamically reload truststore without restarting.
> 
>
> Key: KAFKA-4701
> URL: https://issues.apache.org/jira/browse/KAFKA-4701
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Allen Xiang
>Priority: Major
>  Labels: security
> Fix For: 2.0.0
>
>
> Right now in order to add SSL clients(update broker truststores), a rolling 
> restart of all brokers is required. This is very time consuming and 
> unnecessary. A dynamic truststore manager is needed to reload truststore from 
> file system without restarting brokers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5272) Improve validation for Alter Configs (KIP-133)

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5272:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Improve validation for Alter Configs (KIP-133)
> --
>
> Key: KAFKA-5272
> URL: https://issues.apache.org/jira/browse/KAFKA-5272
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 2.0.0
>
>
> TopicConfigHandler.processConfigChanges() warns about certain topic configs. 
> We should include such validations in alter configs and reject the change if 
> the validation fails. Note that this should be done without changing the 
> behaviour of the ConfigCommand (as it does not have access to the broker 
> configs).
> We should consider adding other validations like KAFKA-4092 and KAFKA-4680.
> Finally, the AlterConfigsPolicy mentioned in KIP-133 will be added at the 
> same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5276) Support derived and prefixed configs in DescribeConfigs (KIP-133)

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5276:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Support derived and prefixed configs in DescribeConfigs (KIP-133)
> -
>
> Key: KAFKA-5276
> URL: https://issues.apache.org/jira/browse/KAFKA-5276
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 2.0.0
>
>
> The broker supports config overrides per listener. The way we do that is by 
> prefixing the configs with the listener name. These configs are not defined 
> by ConfigDef and they don't appear in `values()`. They do appear in 
> `originals()`. We should change the code so that we return these configs. 
> Because these configs are read-only, nothing needs to be done for 
> AlterConfigs.
> With regards to derived configs, an example is advertised.listeners, which 
> falls back to listeners. This is currently done outside AbstractConfig. We 
> should look into including these into AbstractConfig so that the fallback 
> happens for the returned configs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5029) cleanup javadocs and logging

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5029:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> cleanup javadocs and logging
> 
>
> Key: KAFKA-5029
> URL: https://issues.apache.org/jira/browse/KAFKA-5029
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.0.0
>
>
> Remove state change logger, splitting it up into the controller logs or 
> broker logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4914:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Partition re-assignment tool should check types before persisting state in 
> ZooKeeper
> 
>
> Key: KAFKA-4914
> URL: https://issues.apache.org/jira/browse/KAFKA-4914
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>Assignee: Nick Travers
>Priority: Major
> Fix For: 2.0.0
>
>
> The partition-reassignment too currently allows non-type-safe information to 
> be persisted into ZooKeeper, which can result in a ClassCastException at 
> runtime for brokers.
> Specifically, this occurred when the broker assignment field was a List of 
> Strings, instead of a List of Integers.
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
> at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-2435:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3297:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2018-02-08 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356998#comment-16356998
 ] 

Damian Guy commented on KAFKA-4879:
---

[~hachikuji] is this going to make it for 1.1?

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4307:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Inconsistent parameters between console producer and consumer
> -
>
> Key: KAFKA-4307
> URL: https://issues.apache.org/jira/browse/KAFKA-4307
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Balint Molnar
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> kafka-console-producer uses --broker-list while kafka-console-consumer uses 
> --bootstrap-server.
> Let's add --bootstrap-server to the producer for some consistency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4665:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.0.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4862:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Kafka client connect to a shutdown node will block for a long time
> --
>
> Key: KAFKA-4862
> URL: https://issues.apache.org/jira/browse/KAFKA-4862
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.2.0
>Reporter: Pengwei
>Assignee: Pengwei
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently in our test env, we found after one of the broker node crash(reboot 
> or os crash), the client maybe still connecting to the crash node to send 
> metadata request or other request, and it need about several  minutes to 
> aware the connection is timeout then try another node to connect to send the 
> request.  Then the client may still not aware the metadata change after 
> several minutes.
> We don't have a connection timeout for the network client, we should add a 
> connection timeout for the client



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4249) Document how to customize GC logging options for broker

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4249:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Document how to customize GC logging options for broker
> ---
>
> Key: KAFKA-4249
> URL: https://issues.apache.org/jira/browse/KAFKA-4249
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Jim Hoagland
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.0.0
>
>
> We wanted to enable GC logging for Kafka broker and saw that you can set 
> GC_LOG_ENABLED=true.  However, this didn't do what we wanted.  For example, 
> the GC log will be overwritten every time the broker gets restarted.  It 
> wasn't clear how we could do that (no documentation of it that I can find), 
> so I did some research by looking at the source code and did some testing and 
> found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to 
> starting broker.  I posted my solution to StackOverflow:
>   
> http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove
> (feel free to critique)
> That solution is now public, but it seems like the Kafka documentation should 
> say how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4203) Java producer default max message size does not align with broker default

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4203:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Java producer default max message size does not align with broker default
> -
>
> Key: KAFKA-4203
> URL: https://issues.apache.org/jira/browse/KAFKA-4203
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Grant Henke
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.0.0
>
>
> The Java producer sets max.request.size = 1048576 (the base 2 version of 1 MB 
> (MiB))
> The broker sets max.message.bytes = 112 (the base 10 value of 1 MB + 12 
> bytes for overhead)
> This means that by default the producer can try to produce messages larger 
> than the broker will accept resulting in RecordTooLargeExceptions.
> There were not similar issues in the old producer because it sets 
> max.message.size = 100 (the base 10 value of 1 MB)
> I propose we increase the broker default for max.message.bytes to 1048588 
> (the base 2 value of 1 MB (MiB) + 12 bytes for overhead) so that any message 
> produced with default configs from either producer does not result in a 
> RecordTooLargeException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3554) Generate actual data with specific compression ratio and add multi-thread support in the ProducerPerformance tool.

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3554:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Generate actual data with specific compression ratio and add multi-thread 
> support in the ProducerPerformance tool.
> --
>
> Key: KAFKA-3554
> URL: https://issues.apache.org/jira/browse/KAFKA-3554
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently the ProducerPerformance always generate the payload with same 
> bytes. This does not quite well to test the compressed data because the 
> payload is extremely compressible no matter how big the payload is.
> We can make some changes to make it more useful for compressed messages. 
> Currently I am generating the payload containing integer from a given range. 
> By adjusting the range of the integers, we can get different compression 
> ratios. 
> API wise, we can either let user to specify the integer range or the expected 
> compression ratio (we will do some probing to get the corresponding range for 
> the users)
> Besides that, in many cases, it is useful to have multiple producer threads 
> when the producer threads themselves are bottleneck. Admittedly people can 
> run multiple ProducerPerformance to achieve similar result, but it is still 
> different from the real case when people actually use the producer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3689) Exception when attempting to decrease connection count for address with no connections

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3689:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Exception when attempting to decrease connection count for address with no 
> connections
> --
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Ryan P
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-3689.log.redacted, kafka-3689-instrumentation.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3190:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 2.0.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3438:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Rack Aware Replica Reassignment should warn of overloaded brokers
> -
>
> Key: KAFKA-3438
> URL: https://issues.apache.org/jira/browse/KAFKA-3438
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Ben Stopford
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.0.0
>
>
> We've changed the replica reassignment code to be rack aware.
> One problem that might catch users out would be that they rebalance the 
> cluster using kafka-reassign-partitions.sh but their rack configuration means 
> that some high proportion of replicas are pushed onto a single, or small 
> number of, brokers. 
> This should be an easy problem to avoid, by changing the rack assignment 
> information, but we should probably warn users if they are going to create 
> something that is unbalanced. 
> So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack 
> awareness enabled. If I add a 13th machine, on a new rack, and run the 
> rebalance tool, that new machine will get ~6x as many replicas as the least 
> loaded broker. 
> Suggest a warning  be added to the tool output when --generate is called. 
> "The most loaded broker has 2.3x as many replicas as the the least loaded 
> broker. This is likely due to an uneven distribution of brokers across racks. 
> You're advised to alter the rack config so there are approximately the same 
> number of brokers per rack" and displays the individual rack→#brokers and 
> broker→#replicas data for the proposed move.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3177) Kafka consumer can hang when position() is called on a non-existing partition.

2018-02-08 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3177:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> Kafka consumer can hang when position() is called on a non-existing partition.
> --
>
> Key: KAFKA-3177
> URL: https://issues.apache.org/jira/browse/KAFKA-3177
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 2.0.0
>
>
> This can be easily reproduced as following:
> {code}
> {
> ...
> consumer.assign(SomeNonExsitingTopicParition);
> consumer.position();
> ...
> }
> {code}
> It seems when position is called we will try to do the following:
> 1. Fetch committed offsets.
> 2. If there is no committed offsets, try to reset offset using reset 
> strategy. in sendListOffsetRequest(), if the consumer does not know the 
> TopicPartition, it will refresh its metadata and retry. In this case, because 
> the partition does not exist, we fall in to the infinite loop of refreshing 
> topic metadata.
> Another orthogonal issue is that if the topic in the above code piece does 
> not exist, position() call will actually create the topic due to the fact 
> that currently topic metadata request could automatically create the topic. 
> This is a known separate issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6542) Tables should trigger joins too, not just streams

2018-02-08 Thread Antony Stubbs (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antony Stubbs updated KAFKA-6542:
-
Description: 
At the moment it's quite possible to have a race condition when joining a 
stream with a table, if the stream event arrives first, before the table event, 
in which case the join will fail.

This is also related to bootstrapping KTables (which is what a GKTable does).

Related to: KAFKA-4113 Allow KTable bootstrap

  was:
At the moment it's quite possible to have a race condition when joining a 
stream with a table, if the stream event arrives first, before the table event, 
in which case the join will fail.

This is also related to bootstrapping KTables (which is what a GKTable does).

Related to: KAFKA-6543 Allow KTables to be bootstrapped at start up, like 
GKTables


> Tables should trigger joins too, not just streams
> -
>
> Key: KAFKA-6542
> URL: https://issues.apache.org/jira/browse/KAFKA-6542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> At the moment it's quite possible to have a race condition when joining a 
> stream with a table, if the stream event arrives first, before the table 
> event, in which case the join will fail.
> This is also related to bootstrapping KTables (which is what a GKTable does).
> Related to: KAFKA-4113 Allow KTable bootstrap



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6542) Tables should trigger joins too, not just streams

2018-02-08 Thread Antony Stubbs (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antony Stubbs updated KAFKA-6542:
-
Description: 
At the moment it's quite possible to have a race condition when joining a 
stream with a table, if the stream event arrives first, before the table event, 
in which case the join will fail.

This is also related to bootstrapping KTables (which is what a GKTable does).

Related to: KAFKA-6543 Allow KTables to be bootstrapped at start up, like 
GKTables

  was:
At the moment it's quite possible to have a race condition when joining a 
stream with a table, if the stream event arrives first, before the table event, 
in which case the join will fail.

This is also related to bootstrapping KTables (which is what a GKTable does).


> Tables should trigger joins too, not just streams
> -
>
> Key: KAFKA-6542
> URL: https://issues.apache.org/jira/browse/KAFKA-6542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> At the moment it's quite possible to have a race condition when joining a 
> stream with a table, if the stream event arrives first, before the table 
> event, in which case the join will fail.
> This is also related to bootstrapping KTables (which is what a GKTable does).
> Related to: KAFKA-6543 Allow KTables to be bootstrapped at start up, like 
> GKTables



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables

2018-02-08 Thread Antony Stubbs (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antony Stubbs updated KAFKA-6543:
-
Affects Version/s: 1.1.0
  Description: 
In some uses cases, it's desirable to have KTables "fully" bootstrapped (at 
leas in best efforts) before the topology begins, similar to how a GKTable 
does. This could prevent join race conditions for one, which could be a big 
problem if local KTable state has been lost.

 

Related to KAFKA-6542 Tables should trigger joins too, not just streams

  was:In some uses cases, it's desirable to have KTables "fully" bootstrapped 
(at leas in best efforts) before the topology begins, similar to how a GKTable 
does. This could prevent join race conditions for one, which could be a big 
problem if local KTable state has been lost.

  Component/s: streams

> Allow KTables to be bootstrapped at start up, like GKTables
> ---
>
> Key: KAFKA-6543
> URL: https://issues.apache.org/jira/browse/KAFKA-6543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> In some uses cases, it's desirable to have KTables "fully" bootstrapped (at 
> leas in best efforts) before the topology begins, similar to how a GKTable 
> does. This could prevent join race conditions for one, which could be a big 
> problem if local KTable state has been lost.
>  
> Related to KAFKA-6542 Tables should trigger joins too, not just streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables

2018-02-08 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6543:


 Summary: Allow KTables to be bootstrapped at start up, like 
GKTables
 Key: KAFKA-6543
 URL: https://issues.apache.org/jira/browse/KAFKA-6543
 Project: Kafka
  Issue Type: Improvement
Reporter: Antony Stubbs


In some uses cases, it's desirable to have KTables "fully" bootstrapped (at 
leas in best efforts) before the topology begins, similar to how a GKTable 
does. This could prevent join race conditions for one, which could be a big 
problem if local KTable state has been lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6542) Tables should trigger joins too, not just streams

2018-02-08 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6542:


 Summary: Tables should trigger joins too, not just streams
 Key: KAFKA-6542
 URL: https://issues.apache.org/jira/browse/KAFKA-6542
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Antony Stubbs


At the moment it's quite possible to have a race condition when joining a 
stream with a table, if the stream event arrives first, before the table event, 
in which case the join will fail.

This is also related to bootstrapping KTables (which is what a GKTable does).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5165) Kafka Logs Cleanup Not happening, Huge File Growth - Windows

2018-02-08 Thread Akash C (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356937#comment-16356937
 ] 

Akash C commented on KAFKA-5165:


We are facing a similar issue in 0.11.0.0. Is there a workaround for this?

> Kafka Logs Cleanup Not happening, Huge File Growth - Windows
> 
>
> Key: KAFKA-5165
> URL: https://issues.apache.org/jira/browse/KAFKA-5165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: windows, Kafka Server(Version: 0.9.0.1)
>Reporter: Manikandan P
>Priority: Major
>
> We had set the below configuration: Retention hours as 1, Retention bytes as 
> 150 MB in the server.properties in the Kafka Server(Version: 0.9.0.1). Also 
> modified other settings as given below.
> log.dirs=/tmp/kafka-logs  
> log.retention.hours=1
> log.retention.bytes=157286400
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> log.cleaner.enable=true
> log.cleanup.policy=delete
> After checking few days, Size of the Kafka log folder too huge as 13.2 GB. We 
> have seen that Topic Offset getting updated and ignores the Old data but Log 
> File doesnt reduce and has all the Old Data and become too huge. Could you 
> help us to find out why Kafka is not deleting the logs(Physically). Do we 
> need to change any configuration ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should accept raw text

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356842#comment-16356842
 ] 

ASF GitHub Bot commented on KAFKA-6424:
---

h314to opened a new pull request #4549: KAFKA-6424: 
QueryableStateIntegrationTest#queryOnRebalance should accept raw text
URL: https://github.com/apache/kafka/pull/4549
 
 
   * Remove to .toLowerCase in word count stream
   * Add method to read text from file, and move text to resources file
   * Minor cleanup: change argument order in assertEquals
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> QueryableStateIntegrationTest#queryOnRebalance should accept raw text
> -
>
> Key: KAFKA-6424
> URL: https://issues.apache.org/jira/browse/KAFKA-6424
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Filipe Agapito
>Priority: Minor
>  Labels: newbie, unit-test
>
> I was using QueryableStateIntegrationTest#queryOnRebalance for some 
> performance test by adding more sentences to inputValues.
> I found that when the sentence contains upper case letter, the test would 
> timeout.
> I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} 
> before the split.
> Ideally we can specify the path to text file which contains the text. The 
> test can read the text file and generate the input array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should accept raw text

2018-02-08 Thread Filipe Agapito (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356836#comment-16356836
 ] 

Filipe Agapito commented on KAFKA-6424:
---

This happens because {{createCountStream}} is using 
{{value.toLowerCase(Locale.getDefault())}} so an uppercase word never shows up 
in the stream and {{verifyAllKVKeys}} timeouts. I fixed it by removing the 
{{.toLowerCase}} from {{createCountStream}}.

In order to comply with the last requirement in the description I've also added 
a method that reads a file in the test resources and generates the inputValue 
list.

I'll be creating a pull request shortly.

> QueryableStateIntegrationTest#queryOnRebalance should accept raw text
> -
>
> Key: KAFKA-6424
> URL: https://issues.apache.org/jira/browse/KAFKA-6424
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Filipe Agapito
>Priority: Minor
>  Labels: newbie, unit-test
>
> I was using QueryableStateIntegrationTest#queryOnRebalance for some 
> performance test by adding more sentences to inputValues.
> I found that when the sentence contains upper case letter, the test would 
> timeout.
> I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} 
> before the split.
> Ideally we can specify the path to text file which contains the text. The 
> test can read the text file and generate the input array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-08 Thread Anh Le (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356819#comment-16356819
 ] 

Anh Le commented on KAFKA-6541:
---

[~huxi_2b] I created [a PR|https://github.com/apache/kafka/pull/4548]  for this 
issue. Please have a look at it.

> StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
> --
>
> Key: KAFKA-6541
> URL: https://issues.apache.org/jira/browse/KAFKA-6541
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Anh Le
>Priority: Major
>
> There's something wrong with our client library when sending heart beats. 
> This bug seems to be identical to this one: 
> [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]
>  
> Here's the log:
>  
> {quote}2018-02-08 13:55:01,102 ERROR 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread
>  Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
> default-group':
> java.lang.StackOverflowError: null
>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>  at 
> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)
>  at 
> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
>  at 
> ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
>  at 
> ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
>  at 
> ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
>  at 
> ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
>  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
>  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
>  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
>  at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)
>  at ch.qos.logback.classic.Logger.info(Logger.java:583)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> 

[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356818#comment-16356818
 ] 

ASF GitHub Bot commented on KAFKA-6541:
---

anhldbk opened a new pull request #4548: KAFKA-6541: Fix a StackOverflow bug in 
kafka-coordinator-heartbeat-thread
URL: https://github.com/apache/kafka/pull/4548
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   - The bug is caused by infinite recursive calls to 
`ConsumerNetworkClient.disconnect()` 
   
   - In class 
`org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient`, I 
introduced a boolean field named `disconnecting` to prevent such calls happen
   
   ```java
   public void disconnect(Node node) {
   synchronized (this) {
   if(disconnecting){
   return;
   }
   disconnecting=true;
   failUnsentRequests(node, DisconnectException.INSTANCE);  
  
   
   client.disconnect(node.idString());
   }
   
   // We need to poll to ensure callbacks from in-flight requests on 
the disconnected socket are fired
   pollNoWakeup();
   }
   ```
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   No test to implement (I think this is a trivial fix)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
> --
>
> Key: KAFKA-6541
> URL: https://issues.apache.org/jira/browse/KAFKA-6541
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Anh Le
>Priority: Major
>
> There's something wrong with our client library when sending heart beats. 
> This bug seems to be identical to this one: 
> [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]
>  
> Here's the log:
>  
> {quote}2018-02-08 13:55:01,102 ERROR 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread
>  Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
> default-group':
> java.lang.StackOverflowError: null
>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>  at 
> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)
>  at 
> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>  at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
>  at 
> ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
>  at 
> ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
>  at 
> ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
>  at 
> ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
>  at 
> ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
>  at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
>  at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
>  at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
>  at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)
>  at ch.qos.logback.classic.Logger.info(Logger.java:583)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> 

[jira] [Updated] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-08 Thread Anh Le (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anh Le updated KAFKA-6541:
--
Description: 
There's something wrong with our client library when sending heart beats. This 
bug seems to be identical to this one: 
[http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]

 

Here's the log:

 
{quote}2018-02-08 13:55:01,102 ERROR 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread 
Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
default-group':
java.lang.StackOverflowError: null
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at 
org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)
 at 
org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
 at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
 at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
 at 
ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
 at 
ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
 at 
ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
 at 
ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
 at 
ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
 at 
ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
 at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
 at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
 at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
 at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)
 at ch.qos.logback.classic.Logger.info(Logger.java:583)
 at 
org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
 at 

[jira] [Commented] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-08 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356756#comment-16356756
 ] 

huxihx commented on KAFKA-6541:
---

Seems `coordinatorDead` was recursively called  which could be proved by the 
repeating pattern of line numbers (797 -> 653 -> 388 -> 416 -> ... -> 797 -> 
653 -> ...)

> StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
> --
>
> Key: KAFKA-6541
> URL: https://issues.apache.org/jira/browse/KAFKA-6541
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Anh Le
>Priority: Major
>
> There's something wrong with our client library when sending heart beats. 
> This bug seems to be identical to this one: 
> [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]
>  
> Here's the log:
>  
> {{2018-02-08 13:55:01,102 ERROR 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread
>  Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
> default-group':}}
> {{java.lang.StackOverflowError: null}}
> {{ at java.lang.StringBuilder.append(StringBuilder.java:136)}}
> {{ at 
> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)}}
> {{ at 
> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)}}
> {{ at 
> org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)}}
> {{ at 
> org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)}}
> {{ at 
> ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)}}
> {{ at 
> ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)}}
> {{ at 
> ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)}}
> {{ at 
> ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)}}
> {{ at 
> ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)}}
> {{ at 
> ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)}}
> {{ at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)}}
> {{ at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)}}
> {{ at 
> ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)}}
> {{ at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)}}
> {{ at ch.qos.logback.classic.Logger.info(Logger.java:583)}}
> {{ at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}}
> {{ at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}}
> {{ at 
> 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes for to controller to go through 
controlled shutdown, detect the broker failure through zookeeper, and 
transition all replicas to OfflineReplica state. After applying the patch, the 
same process takes 23 seconds.

The testing done:
After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes for to controller to go through 
controlled shutdown, detect the broker failure through zookeeper, and 
transition all replicas to OfflineReplica state. After applying the patch, the 
same process takes 23 seconds.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've