[jira] [Updated] (KAFKA-7620) ConfigProvider is broken for KafkaConnect when TTL is not null

2018-11-12 Thread Ye Ji (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ye Ji updated KAFKA-7620:
-
Affects Version/s: 2.0.0
  Summary: ConfigProvider is broken for KafkaConnect when TTL is 
not null  (was: ConfigProvider is broken for connect when TTL is not null)

> ConfigProvider is broken for KafkaConnect when TTL is not null
> --
>
> Key: KAFKA-7620
> URL: https://issues.apache.org/jira/browse/KAFKA-7620
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Ye Ji
>Priority: Major
>
> If the ConfigData returned by ConfigProvider.get implementations has non-null 
> and non-negative ttl, it will trigger infinite recursion, here is an excerpt 
> of the stack trace:
> {code:java}
> at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
>   at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
> {code}
> Basically, 
> 1) if a non-null ttl is returned from the config provider, connect runtime 
> will try to schedule a reload in the future, 
> 2) scheduleReload function reads the config again to see if it is a restart 
> or not, by calling 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to 
> transform the config
> 3) the transform function calls config provider, and gets a non-null ttl, 
> causing scheduleReload being called, we are back to step 1.
> To reproduce, simply fork the provided 
> [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java],
>  and add a non-negative ttl to the ConfigData returned by the get functions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7620) ConfigProvider is broken for connect when TTL is not null

2018-11-12 Thread Ye Ji (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ye Ji updated KAFKA-7620:
-
Description: 
If the ConfigData returned by ConfigProvider.get implementations has non-null 
and non-negative ttl, it will trigger infinite recursion, here is an excerpt of 
the stack trace:
{code:java}
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
at 
org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
{code}


Basically, 
1) if a non-null ttl is returned from the config provider, connect runtime will 
try to schedule a reload in the future, 
2) scheduleReload function reads the config again to see if it is a restart or 
not, by calling 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to transform 
the config
3) the transform function calls config provider, and gets a non-null ttl, 
causing scheduleReload being called, we are back to step 1.


To reproduce, simply fork the provided 
[FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java],
 and add a non-negative ttl to the ConfigData returned by the get functions.

  was:
If the ConfigData returned by ConfigProvider.get implementations has non-null 
and non-negative ttl, it will trigger infinite recursion, here is an excerpt of 
the stack trace:
{code:java}
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
at 
org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
{code}


Basically, 
1) if a non-null ttl is returned from the config provider, connect runtime will 
try to schedule a reload in the future, 
2) scheduleReload function reads the config again to see if it is a restart or 
not, by calling 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to transform 
the config
3) the transform function calls config provider, and gets a non-null ttl, 
causing scheduleReload being called, we are back to step 1.


To reproduce, simply fork the provided FileConfigProvider, and add a 
non-negative ttl to the ConfigData returned by the get functions.


> ConfigProvider is broken for connect when TTL is not null
> -
>
> Key: KAFKA-7620
> URL: https://issues.apache.org/jira/browse/KAFKA-7620
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.1
>Reporter: Ye Ji
>Priority: Major
>
> If the ConfigData returned by ConfigProvider.get implementations has non-null 
> and non-negative ttl, it will trigger infinite recursion, here is an excerpt 
> of the stack trace:
> {code:java}
> at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
>   at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
>   at 
> 

[jira] [Commented] (KAFKA-7605) Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684773#comment-16684773
 ] 

ASF GitHub Bot commented on KAFKA-7605:
---

ijuma closed pull request #5890: KAFKA-7605; Retry async commit failures in 
integration test cases to fix flaky tests
URL: https://github.com/apache/kafka/pull/5890
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 3e67b1876ec..e407c023e37 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -84,9 +84,7 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness {
 consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0)
 
 // check async commit callbacks
-val commitCallback = new CountConsumerCommitCallback()
-consumer.commitAsync(commitCallback)
-awaitCommitCallback(consumer, commitCallback)
+sendAndAwaitAsyncCommit(consumer)
   }
 
   @Test
@@ -191,12 +189,38 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness {
 records
   }
 
-  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
-  commitCallback: 
CountConsumerCommitCallback,
-  count: Int = 1): Unit = {
-TestUtils.pollUntilTrue(consumer, () => commitCallback.successCount >= 
count,
+  protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V],
+  offsetsOpt: 
Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = {
+
+def sendAsyncCommit(callback: OffsetCommitCallback) = {
+  offsetsOpt match {
+case Some(offsets) => consumer.commitAsync(offsets.asJava, callback)
+case None => consumer.commitAsync(callback)
+  }
+}
+
+class RetryCommitCallback extends OffsetCommitCallback {
+  var isComplete = false
+  var error: Option[Exception] = None
+
+  override def onComplete(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], exception: Exception): Unit = {
+exception match {
+  case e: RetriableCommitFailedException =>
+sendAsyncCommit(this)
+  case e =>
+isComplete = true
+error = Option(e)
+}
+  }
+}
+
+val commitCallback = new RetryCommitCallback
+
+sendAsyncCommit(commitCallback)
+TestUtils.pollUntilTrue(consumer, () => commitCallback.isComplete,
   "Failed to observe commit callback before timeout", waitTimeMs = 1)
-assertEquals(count, commitCallback.successCount)
+
+assertEquals(None, commitCallback.error)
   }
 
   protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: 
TestConsumerReassignmentListener): Unit = {
@@ -209,21 +233,22 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness {
 // The best way to verify that the current membership is still active is 
to commit offsets.
 // This would fail if the group had rebalanced.
 val initialRevokeCalls = rebalanceListener.callsToRevoked
-val commitCallback = new CountConsumerCommitCallback
-consumer.commitAsync(commitCallback)
-awaitCommitCallback(consumer, commitCallback)
+sendAndAwaitAsyncCommit(consumer)
 assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked)
   }
 
   protected class CountConsumerCommitCallback extends OffsetCommitCallback {
 var successCount = 0
 var failCount = 0
+var lastError: Option[Exception] = None
 
 override def onComplete(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], exception: Exception): Unit = {
-  if (exception == null)
+  if (exception == null) {
 successCount += 1
-  else
+  } else {
 failCount += 1
+lastError = Some(exception)
+  }
 }
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 42b3984e305..5e590cf1df3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -478,14 +478,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 // async commit
 val asyncMetadata = new OffsetAndMetadata(10, "bar")
-val callback = new CountConsumerCommitCallback
-consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
-awaitCommitCallback(consumer, callback)
+

[jira] [Commented] (KAFKA-7572) Producer should not send requests with negative partition id

2018-11-12 Thread Yaodong Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684574#comment-16684574
 ] 

Yaodong Yang commented on KAFKA-7572:
-

I updated this PR with a change of exception type. Please let me know if there 
is any question about this issue.

> Producer should not send requests with negative partition id
> 
>
> Key: KAFKA-7572
> URL: https://issues.apache.org/jira/browse/KAFKA-7572
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Yaodong Yang
>Priority: Major
>
> h3. Issue:
> In one Kafka producer log from our users, we found the following weird one:
> timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to 
> Kafka failed with: ",exception="java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topicName--2: 30042 ms has passed since batch creation plus linger time
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for topicName--2: 30042 ms has passed since batch creation plus 
> linger time"
> After a few hours debugging, we finally understood the root cause of this 
> issue:
>  # The producer used a buggy custom Partitioner, which sometimes generates 
> negative partition ids for new records.
>  # The corresponding produce requests were rejected by brokers, because it's 
> illegal to have a partition with a negative id.
>  # The client kept refreshing its local cluster metadata, but could not send 
> produce requests successfully.
>  # From the above log, we found a suspicious string "topicName--2":
>  # According to the source code, the format of this string in the log is 
> TopicName+"-"+PartitionId.
>  # It's not easy to notice that there were 2 consecutive dash in the above 
> log.
>  # Eventually, we found that the second dash was a negative sign. Therefore, 
> the partition id is -2, rather than 2.
>  # The bug the custom Partitioner.
> h3. Proposal:
>  # Producer code should check the partitionId before sending requests to 
> brokers.
>  # If there is a negative partition Id, just throw an IllegalStateException{{ 
> }}exception.
>  # Such a quick check can save lots of time for people debugging their 
> producer code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7620) ConfigProvider is broken for connect when TTL is not null

2018-11-12 Thread Ye Ji (JIRA)
Ye Ji created KAFKA-7620:


 Summary: ConfigProvider is broken for connect when TTL is not null
 Key: KAFKA-7620
 URL: https://issues.apache.org/jira/browse/KAFKA-7620
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.1
Reporter: Ye Ji


If the ConfigData returned by ConfigProvider.get implementations has non-null 
and non-negative ttl, it will trigger infinite recursion, here is an excerpt of 
the stack trace:
{code:java}
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
at 
org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
{code}


Basically, 
1) if a non-null ttl is returned from the config provider, connect runtime will 
try to schedule a reload in the future, 
2) scheduleReload function reads the config again to see if it is a restart or 
not, by calling 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to transform 
the config
3) the transform function calls config provider, and gets a non-null ttl, 
causing scheduleReload being called, we are back to step 1.


To reproduce, simply fork the provided FileConfigProvider, and add a 
non-negative ttl to the ConfigData returned by the get functions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7612) Fix javac warnings and enable warnings as errors

2018-11-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7612.

   Resolution: Fixed
Fix Version/s: 2.2.0

> Fix javac warnings and enable warnings as errors
> 
>
> Key: KAFKA-7612
> URL: https://issues.apache.org/jira/browse/KAFKA-7612
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.2.0
>
>
> The only way to keep warnings away is to treat them as errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7605) Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`

2018-11-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7605.

   Resolution: Fixed
Fix Version/s: 2.2.0

> Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`
> ---
>
> Key: KAFKA-7605
> URL: https://issues.apache.org/jira/browse/KAFKA-7605
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.2.0
>
>
> {code}
> java.lang.AssertionError: Failed to observe commit callback before timeout
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:351)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:761)
>   at kafka.utils.TestUtils$.pollUntilTrue(TestUtils.scala:727)
>   at 
> kafka.api.BaseConsumerTest.awaitCommitCallback(BaseConsumerTest.scala:198)
>   at 
> kafka.api.BaseConsumerTest.ensureNoRebalance(BaseConsumerTest.scala:214)
>   at 
> kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:117)
> {code}
> Probably just need to increase the timeout a little.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-11-12 Thread Gardella Juan Pablo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684663#comment-16684663
 ] 

Gardella Juan Pablo commented on KAFKA-6188:


I have the same issue with the latest (2.0.1) kafka server on windows:
{code:java}
[2018-11-13 00:10:10,348] ERROR Error while renaming dir for 
__consumer_offsets-3 in log dir C:\tmp\kafka-logs3 
(kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: C:\tmp\kafka-logs3\__consumer_offsets-3 -> 
C:\tmp\kafka-logs3\__consumer_offsets-3.a05956d903874d3f8ccca890cb5278b6-delete
    at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
    at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
    at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
    at java.nio.file.Files.move(Files.java:1395)
    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:786)
    at kafka.log.Log.$anonfun$renameDir$2(Log.scala:689)
    at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at kafka.log.Log.maybeHandleIOException(Log.scala:1842)
    at kafka.log.Log.renameDir(Log.scala:687)
    at kafka.log.LogManager.asyncDelete(LogManager.scala:833)
    at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:271)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259)
    at kafka.cluster.Partition.delete(Partition.scala:265)
    at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:341)
    at 
kafka.server.ReplicaManager.$anonfun$stopReplicas$2(ReplicaManager.scala:371)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:369)
    at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:200)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:111)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.nio.file.AccessDeniedException: 
C:\tmp\kafka-logs3\__consumer_offsets-3 -> 
C:\tmp\kafka-logs3\__consumer_offsets-3.a05956d903874d3f8ccca890cb5278b6-delete
    at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
    at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
    at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
    at java.nio.file.Files.move(Files.java:1395)
    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:783)
    ... 22 more{code}

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO 

[jira] [Commented] (KAFKA-5649) Producer is being closed generating ssl exception

2018-11-12 Thread Behroz Sikander (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683420#comment-16683420
 ] 

Behroz Sikander commented on KAFKA-5649:


[~biplob] could you please share your experience on how you solved this issue?


@others
Has anyone found a solution to this ?

> Producer is being closed generating ssl exception
> -
>
> Key: KAFKA-5649
> URL: https://issues.apache.org/jira/browse/KAFKA-5649
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.1
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am 
> getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that 
> step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> 

[jira] [Commented] (KAFKA-7587) Support for Protocol Buffers Message Format in kafka-console-consumer script

2018-11-12 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684789#comment-16684789
 ] 

huxihx commented on KAFKA-7587:
---

Did you try ConsoleConsumer with `--property 
value.deserializer=`? 

> Support for Protocol Buffers Message Format in kafka-console-consumer script
> 
>
> Key: KAFKA-7587
> URL: https://issues.apache.org/jira/browse/KAFKA-7587
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, tools
>Affects Versions: 2.0.0
>Reporter: Sakalya Deshpande
>Priority: Major
>
> Currently there is no support for Protocol Buffers Message format in 
> kafka-console-consumer
> so,it is not possible to see those messages from command prompt.It would be 
> nice to have command like this
> ./kafka-console-consumer.sh --topic  --serializer 
> 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters

2018-11-12 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683803#comment-16683803
 ] 

Stanislav Kozlovski commented on KAFKA-7618:


https://github.com/apache/kafka/pull/5905

> Trogdor - Fix /tasks endpoint parameters
> 
>
> Key: KAFKA-7618
> URL: https://issues.apache.org/jira/browse/KAFKA-7618
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It 
> supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, 
> `firstEndMs`, `lastEndMs`.
> These arguments denote milliseconds since the unix epoch.
> There is a bug currently where the endpoint parses the arguments as integers, 
> whereas they should be long (the current unix millisecond timestamp does not 
> fit into an integer).
> This results in API calls returning a 404
> {code:java}
> curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks
> * Trying ::1...
> * TCP_NODELAY set
> * Connected to localhost (::1) port 8889 (#0)
> > GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1
> > Host: localhost:8889
> > User-Agent: curl/7.54.0
> > Accept: */*
> >
> < HTTP/1.1 500 Internal Server Error
> < Date: Mon, 12 Nov 2018 13:28:59 GMT
> < Content-Type: application/json
> < Content-Length: 43
> < Server: Jetty(9.4.12.v20180830)
> <
> * Connection #0 to host localhost left intact{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7617) Document security primitives

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683818#comment-16683818
 ] 

ASF GitHub Bot commented on KAFKA-7617:
---

viktorsomogyi opened a new pull request #5906: KAFKA-7617: Add authorization 
primitives to security page
URL: https://github.com/apache/kafka/pull/5906
 
 
   This is a security page improvement that adds documentation about Kafka 
authorization primitives to the security page.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document security primitives
> 
>
> Key: KAFKA-7617
> URL: https://issues.apache.org/jira/browse/KAFKA-7617
> Project: Kafka
>  Issue Type: Task
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Minor
>
> Although the documentation gives help on configuring the authentication and 
> authorization, it won't list what are the security primitives (operations and 
> resources) that can be used which makes it hard for users to easily set up 
> thorough authorization rules.
> This task would cover adding these to the security page of the Kafka 
> documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7619) Trogdor - Allow filtering tasks by state in /coordinator/tasks endpoint

2018-11-12 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7619:
--

 Summary: Trogdor - Allow filtering tasks by state in 
/coordinator/tasks endpoint
 Key: KAFKA-7619
 URL: https://issues.apache.org/jira/browse/KAFKA-7619
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It 
supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, 
`firstEndMs`, `lastEndMs`.
These arguments denote milliseconds since the unix epoch.

It would be useful to support filtering by the state of the task. We currently 
have no way of getting every `RUNNING`, `STOPPED`, or `PENDING` task unless we 
want to manually filter through everything returned by `/coordinator/tasks`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-11-12 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683660#comment-16683660
 ] 

Rajini Sivaram commented on KAFKA-7280:
---

[~sachinu] 2.0.1 clients are compatible with 1.1.x brokers, so there will be no 
compatibility issue. Also, since message format hasn't changed between these 
versions, there shouldn't be a performance issue either. 

To go with recreating consumers when the issue occurs, it should be safe if the 
old consumer is closed before the new one is created. This does result in 
consumer rebalance, so there is a performance impact whenever this is done. 

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> 

[jira] [Created] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters

2018-11-12 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7618:
--

 Summary: Trogdor - Fix /tasks endpoint parameters
 Key: KAFKA-7618
 URL: https://issues.apache.org/jira/browse/KAFKA-7618
 Project: Kafka
  Issue Type: Bug
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It 
supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, 
`firstEndMs`, `lastEndMs`.
These arguments denote milliseconds since the unix epoch.

There is a bug currently where the endpoint parses the arguments as integers, 
whereas they should be long (the current unix millisecond timestamp does not 
fit into an integer).

This results in API calls returning a 404
{code:java}
curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8889 (#0)
> GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1
> Host: localhost:8889
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 500 Internal Server Error
< Date: Mon, 12 Nov 2018 13:28:59 GMT
< Content-Type: application/json
< Content-Length: 43
< Server: Jetty(9.4.12.v20180830)
<
* Connection #0 to host localhost left intact{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7611) Eliminate compiler warnings in Kafka

2018-11-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-7611:
--

Assignee: (was: Ismael Juma)

> Eliminate compiler warnings in Kafka
> 
>
> Key: KAFKA-7611
> URL: https://issues.apache.org/jira/browse/KAFKA-7611
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2018-11-12 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-4453:
---
Issue Type: Improvement  (was: Bug)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7615) Support different topic name in source and destination server in Mirrormaker

2018-11-12 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684233#comment-16684233
 ] 

Ryanne Dolan commented on KAFKA-7615:
-

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] 
(under discussion) supports this via Connect transformations (SMTs)

> Support different topic name in source and destination server in Mirrormaker
> 
>
> Key: KAFKA-7615
> URL: https://issues.apache.org/jira/browse/KAFKA-7615
> Project: Kafka
>  Issue Type: New Feature
>  Components: mirrormaker
>Reporter: Adeeti Kaushal
>Priority: Minor
>
> Currently mirrormaker only supports same topic name in source and destination 
> broker. Support for different topic names in source and destination brokers 
> is needed.
>  
> source broker : topic name -> topicA
> destination broker: topic name -> topicB
>  
> MirrorData from topicA to topicB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684236#comment-16684236
 ] 

ASF GitHub Bot commented on KAFKA-7528:
---

stanislavkozlovski opened a new pull request #5908: KAFKA-7528: Standardize on 
Min/Avg/Max Kafka metrics' default value - NaN
URL: https://github.com/apache/kafka/pull/5908
 
 
   Note I haven't ran the whole test suite yet, bear with me until I fix test 
failures


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standardize on Min/Avg/Max Kafka metrics' default value
> ---
>
> Key: KAFKA-7528
> URL: https://issues.apache.org/jira/browse/KAFKA-7528
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.2.0
>
>
> KIP-386: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684267#comment-16684267
 ] 

ASF GitHub Bot commented on KAFKA-7518:
---

ijuma closed pull request #5815: KAFKA-7518: FutureRecordMetadata.get deadline 
calculation fix
URL: https://github.com/apache/kafka/pull/5815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a38bd04f906..e448d6ebdc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -254,7 +255,8 @@ private void verifyNoTransactionInFlight() {
 partition = partition(record, this.cluster);
 TopicPartition topicPartition = new TopicPartition(record.topic(), 
partition);
 ProduceRequestResult result = new ProduceRequestResult(topicPartition);
-FutureRecordMetadata future = new FutureRecordMetadata(result, 0, 
RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+FutureRecordMetadata future = new FutureRecordMetadata(result, 0, 
RecordBatch.NO_TIMESTAMP,
+0L, 0, 0, Time.SYSTEM);
 long offset = nextOffset(topicPartition);
 Completion completion = new Completion(offset, new 
RecordMetadata(topicPartition, 0, offset,
 RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, 
callback);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 8fcc46ff3ff..d1a643b3196 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Time;
+
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.kafka.clients.producer.RecordMetadata;
-
 /**
  * The future result of a record send
  */
@@ -34,16 +35,18 @@
 private final Long checksum;
 private final int serializedKeySize;
 private final int serializedValueSize;
+private final Time time;
 private volatile FutureRecordMetadata nextRecordMetadata = null;
 
 public FutureRecordMetadata(ProduceRequestResult result, long 
relativeOffset, long createTimestamp,
-Long checksum, int serializedKeySize, int 
serializedValueSize) {
+Long checksum, int serializedKeySize, int 
serializedValueSize, Time time) {
 this.result = result;
 this.relativeOffset = relativeOffset;
 this.createTimestamp = createTimestamp;
 this.checksum = checksum;
 this.serializedKeySize = serializedKeySize;
 this.serializedValueSize = serializedValueSize;
+this.time = time;
 }
 
 @Override
@@ -67,13 +70,14 @@ public RecordMetadata get() throws InterruptedException, 
ExecutionException {
 @Override
 public RecordMetadata get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
 // Handle overflow.
-long now = System.currentTimeMillis();
-long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now 
+ timeout;
+long now = time.milliseconds();
+long timeoutMillis = unit.toMillis(timeout);
+long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE 
: now + timeoutMillis;
 boolean occurred = this.result.await(timeout, unit);
-if (nextRecordMetadata != null)
-return nextRecordMetadata.get(deadline - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
 if (!occurred)
-throw new TimeoutException("Timeout after waiting for " + 
TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+throw new TimeoutException("Timeout after waiting for " + 
timeoutMillis + " ms.");
+if (nextRecordMetadata 

[jira] [Created] (KAFKA-7617) Document security primitives

2018-11-12 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-7617:
-

 Summary: Document security primitives
 Key: KAFKA-7617
 URL: https://issues.apache.org/jira/browse/KAFKA-7617
 Project: Kafka
  Issue Type: Task
Reporter: Viktor Somogyi
Assignee: Viktor Somogyi


Although the documentation gives help on configuring the authentication and 
authorization, it won't list what are the security primitives (operations and 
resources) that can be used which makes it hard for users to easily set up 
thorough authorization rules.
This task would cover adding these to the security page of the Kafka 
documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit

2018-11-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7518:
---
Fix Version/s: 2.1.1
   2.2.0

> FutureRecordMetadata.get deadline calculation from timeout is not using 
> timeunit
> 
>
> Key: KAFKA-7518
> URL: https://issues.apache.org/jira/browse/KAFKA-7518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Code below assumes that timeout is in milliseconds when calculating deadline.
> {code}
> @Override
> public RecordMetadata get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException, TimeoutException {
> // Handle overflow.
> long now = System.currentTimeMillis();
> long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now 
> + timeout;
> {code}
> {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}}
>  failed sometimes for me and it took me to this code segment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value

2018-11-12 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-7528:
---
Summary: Standardize on Min/Avg/Max Kafka metrics' default value  (was: 
Make Min and Max metrics' default value consistent with each other)

> Standardize on Min/Avg/Max Kafka metrics' default value
> ---
>
> Key: KAFKA-7528
> URL: https://issues.apache.org/jira/browse/KAFKA-7528
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.2.0
>
>
> KIP-386: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value

2018-11-12 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-7528:
---
Description: KIP-386: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345]
  (was: KIP-386: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics])

> Standardize on Min/Avg/Max Kafka metrics' default value
> ---
>
> Key: KAFKA-7528
> URL: https://issues.apache.org/jira/browse/KAFKA-7528
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.2.0
>
>
> KIP-386: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens

2018-11-12 Thread Zhanxiang (Patrick) Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanxiang (Patrick) Huang resolved KAFKA-7452.
--
Resolution: Duplicate

KAFKA-7557 fixed this.

> Deleting snapshot files after check-pointing log recovery offsets can slow 
> down replication when truncation happens
> ---
>
> Key: KAFKA-7452
> URL: https://issues.apache.org/jira/browse/KAFKA-7452
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> After KAFKA-5829, Kafka will try to iterate through all the partition dirs to 
> delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". 
> Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following 
> places:
>  # Truncation
>  # Log dir deletion and movement
>  # Background thread checkpointing recovery offsets
> In 2.0 deployment on a cluster with 10k partitions per broker, we found out 
> that deleting useless snapshot files in the critical path of log truncation 
> can significantly slow down followers to catch up with leader during rolling 
> bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" 
> for the whole data directory only to potentially delete the snapshot files in 
> one partition directory because the way we identify snapshot files is to list 
> the directories and check the filename suffix.
> In our case, "listSnapshotFiles" takes ~1ms per partition directory so it 
> takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after 
> the truncation, which delays future fetches in the fetcher thread.
> Here are the related code snippets:
>  LogManager.scala
>  
> {code:java}
> private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
>   for {
> partitionToLog <- logsByDir.get(dir.getAbsolutePath)
> checkpoint <- recoveryPointCheckpoints.get(dir)
>   } {
> try {
>   checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
>   allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
> } catch {
>   case e: IOException =>
> logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, 
> s"Disk error while writing to recovery point " +
>   s"file in directory $dir", e)
> }
>   }
> }
> {code}
>  
>  ProducerStateChangeManager.scala
>  
> {code:java}
> private[log] def listSnapshotFiles(dir: File): Seq[File] = {
>   if (dir.exists && dir.isDirectory) {
> Option(dir.listFiles).map { files =>
>   files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
> }.getOrElse(Seq.empty)
>   } else Seq.empty
> }
> private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => 
> true) {
>   listSnapshotFiles(dir).filter(file => 
> predicate(offsetFromFile(file))).foreach { file =>
> Files.deleteIfExists(file.toPath)
>   }
> }
> {code}
>  
> There are a few things that can be optimized here:
>  # We can have an in-memory cache for the snapshot files metadata (filename) 
> in ProducerStateManager to avoid calling dir.listFiles in 
> "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile".
>  # After truncation, we can only try to delete snapshot files for the 
> truncated partitions (in replica fetcher thread, we truncate one partition at 
> a time) instead of all partitions. Or maybe we don't even need to delete 
> snapshot files in the critical path of truncation because the background 
> log-recovery-offset-checkpoint-thread will do it periodically. This can also 
> apply on log deletion/movement.
>  # If we want to further optimize the actual snapshot file deletion, we can 
> make it async. But I am not sure whether it is needed after we have 1) and 2).
> Also, we notice that there is no way to disable transaction/exactly-once 
> support in the broker-side given that it will bring in some extra overhead 
> even though we have no clients using this feature. Not sure whether this is a 
> common use case, but it is useful if we can have a switch to avoid the extra 
> performance overhead.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6958) Allow to define custom processor names with KStreams DSL

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684450#comment-16684450
 ] 

ASF GitHub Bot commented on KAFKA-6958:
---

fhussonnois opened a new pull request #5909: KAFKA-6958: Allow to define custom 
processor names with KStreams DSL
URL: https://github.com/apache/kafka/pull/5909
 
 
   This is a WIP for the KIP :  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to define custom processor names with KStreams DSL
> 
>
> Key: KAFKA-6958
> URL: https://issues.apache.org/jira/browse/KAFKA-6958
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: kip
>
> Currently, while building a new Topology through the KStreams DSL the 
> processors are automatically named.
> The genarated names are prefixed depending of the operation (i.e 
> KSTREAM-SOURCE, KSTREAM-FILTER, KSTREAM-MAP, etc).
> To debug/understand a topology it is possible to display the processor 
> lineage with the method Topology#describe(). However, a complex topology with 
> dozens of operations can be hard to understand if the processor names are not 
> relevant.
> It would be useful to be able to set more meaningful names. For example, a 
> processor name could describe the business rule performed by a map() 
> operation.
> [KIP-307|https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)