[jira] [Updated] (KAFKA-7620) ConfigProvider is broken for KafkaConnect when TTL is not null
[ https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ye Ji updated KAFKA-7620: - Affects Version/s: 2.0.0 Summary: ConfigProvider is broken for KafkaConnect when TTL is not null (was: ConfigProvider is broken for connect when TTL is not null) > ConfigProvider is broken for KafkaConnect when TTL is not null > -- > > Key: KAFKA-7620 > URL: https://issues.apache.org/jira/browse/KAFKA-7620 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1 >Reporter: Ye Ji >Priority: Major > > If the ConfigData returned by ConfigProvider.get implementations has non-null > and non-negative ttl, it will trigger infinite recursion, here is an excerpt > of the stack trace: > {code:java} > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) > at > org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) > {code} > Basically, > 1) if a non-null ttl is returned from the config provider, connect runtime > will try to schedule a reload in the future, > 2) scheduleReload function reads the config again to see if it is a restart > or not, by calling > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to > transform the config > 3) the transform function calls config provider, and gets a non-null ttl, > causing scheduleReload being called, we are back to step 1. > To reproduce, simply fork the provided > [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java], > and add a non-negative ttl to the ConfigData returned by the get functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7620) ConfigProvider is broken for connect when TTL is not null
[ https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ye Ji updated KAFKA-7620: - Description: If the ConfigData returned by ConfigProvider.get implementations has non-null and non-negative ttl, it will trigger infinite recursion, here is an excerpt of the stack trace: {code:java} at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) at org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) {code} Basically, 1) if a non-null ttl is returned from the config provider, connect runtime will try to schedule a reload in the future, 2) scheduleReload function reads the config again to see if it is a restart or not, by calling org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to transform the config 3) the transform function calls config provider, and gets a non-null ttl, causing scheduleReload being called, we are back to step 1. To reproduce, simply fork the provided [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java], and add a non-negative ttl to the ConfigData returned by the get functions. was: If the ConfigData returned by ConfigProvider.get implementations has non-null and non-negative ttl, it will trigger infinite recursion, here is an excerpt of the stack trace: {code:java} at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) at org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) {code} Basically, 1) if a non-null ttl is returned from the config provider, connect runtime will try to schedule a reload in the future, 2) scheduleReload function reads the config again to see if it is a restart or not, by calling org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to transform the config 3) the transform function calls config provider, and gets a non-null ttl, causing scheduleReload being called, we are back to step 1. To reproduce, simply fork the provided FileConfigProvider, and add a non-negative ttl to the ConfigData returned by the get functions. > ConfigProvider is broken for connect when TTL is not null > - > > Key: KAFKA-7620 > URL: https://issues.apache.org/jira/browse/KAFKA-7620 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.1 >Reporter: Ye Ji >Priority: Major > > If the ConfigData returned by ConfigProvider.get implementations has non-null > and non-negative ttl, it will trigger infinite recursion, here is an excerpt > of the stack trace: > {code:java} > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) > at > org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121) > at >
[jira] [Commented] (KAFKA-7605) Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`
[ https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684773#comment-16684773 ] ASF GitHub Bot commented on KAFKA-7605: --- ijuma closed pull request #5890: KAFKA-7605; Retry async commit failures in integration test cases to fix flaky tests URL: https://github.com/apache/kafka/pull/5890 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 3e67b1876ec..e407c023e37 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -84,9 +84,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0) // check async commit callbacks -val commitCallback = new CountConsumerCommitCallback() -consumer.commitAsync(commitCallback) -awaitCommitCallback(consumer, commitCallback) +sendAndAwaitAsyncCommit(consumer) } @Test @@ -191,12 +189,38 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { records } - protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], - commitCallback: CountConsumerCommitCallback, - count: Int = 1): Unit = { -TestUtils.pollUntilTrue(consumer, () => commitCallback.successCount >= count, + protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V], + offsetsOpt: Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = { + +def sendAsyncCommit(callback: OffsetCommitCallback) = { + offsetsOpt match { +case Some(offsets) => consumer.commitAsync(offsets.asJava, callback) +case None => consumer.commitAsync(callback) + } +} + +class RetryCommitCallback extends OffsetCommitCallback { + var isComplete = false + var error: Option[Exception] = None + + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = { +exception match { + case e: RetriableCommitFailedException => +sendAsyncCommit(this) + case e => +isComplete = true +error = Option(e) +} + } +} + +val commitCallback = new RetryCommitCallback + +sendAsyncCommit(commitCallback) +TestUtils.pollUntilTrue(consumer, () => commitCallback.isComplete, "Failed to observe commit callback before timeout", waitTimeMs = 1) -assertEquals(count, commitCallback.successCount) + +assertEquals(None, commitCallback.error) } protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = { @@ -209,21 +233,22 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { // The best way to verify that the current membership is still active is to commit offsets. // This would fail if the group had rebalanced. val initialRevokeCalls = rebalanceListener.callsToRevoked -val commitCallback = new CountConsumerCommitCallback -consumer.commitAsync(commitCallback) -awaitCommitCallback(consumer, commitCallback) +sendAndAwaitAsyncCommit(consumer) assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked) } protected class CountConsumerCommitCallback extends OffsetCommitCallback { var successCount = 0 var failCount = 0 +var lastError: Option[Exception] = None override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = { - if (exception == null) + if (exception == null) { successCount += 1 - else + } else { failCount += 1 +lastError = Some(exception) + } } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 42b3984e305..5e590cf1df3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -478,14 +478,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { // async commit val asyncMetadata = new OffsetAndMetadata(10, "bar") -val callback = new CountConsumerCommitCallback -consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback) -awaitCommitCallback(consumer, callback) +
[jira] [Commented] (KAFKA-7572) Producer should not send requests with negative partition id
[ https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684574#comment-16684574 ] Yaodong Yang commented on KAFKA-7572: - I updated this PR with a change of exception type. Please let me know if there is any question about this issue. > Producer should not send requests with negative partition id > > > Key: KAFKA-7572 > URL: https://issues.apache.org/jira/browse/KAFKA-7572 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Yaodong Yang >Priority: Major > > h3. Issue: > In one Kafka producer log from our users, we found the following weird one: > timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to > Kafka failed with: ",exception="java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topicName--2: 30042 ms has passed since batch creation plus linger time > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for topicName--2: 30042 ms has passed since batch creation plus > linger time" > After a few hours debugging, we finally understood the root cause of this > issue: > # The producer used a buggy custom Partitioner, which sometimes generates > negative partition ids for new records. > # The corresponding produce requests were rejected by brokers, because it's > illegal to have a partition with a negative id. > # The client kept refreshing its local cluster metadata, but could not send > produce requests successfully. > # From the above log, we found a suspicious string "topicName--2": > # According to the source code, the format of this string in the log is > TopicName+"-"+PartitionId. > # It's not easy to notice that there were 2 consecutive dash in the above > log. > # Eventually, we found that the second dash was a negative sign. Therefore, > the partition id is -2, rather than 2. > # The bug the custom Partitioner. > h3. Proposal: > # Producer code should check the partitionId before sending requests to > brokers. > # If there is a negative partition Id, just throw an IllegalStateException{{ > }}exception. > # Such a quick check can save lots of time for people debugging their > producer code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7620) ConfigProvider is broken for connect when TTL is not null
Ye Ji created KAFKA-7620: Summary: ConfigProvider is broken for connect when TTL is not null Key: KAFKA-7620 URL: https://issues.apache.org/jira/browse/KAFKA-7620 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.1 Reporter: Ye Ji If the ConfigData returned by ConfigProvider.get implementations has non-null and non-negative ttl, it will trigger infinite recursion, here is an excerpt of the stack trace: {code:java} at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) at org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49) {code} Basically, 1) if a non-null ttl is returned from the config provider, connect runtime will try to schedule a reload in the future, 2) scheduleReload function reads the config again to see if it is a restart or not, by calling org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to transform the config 3) the transform function calls config provider, and gets a non-null ttl, causing scheduleReload being called, we are back to step 1. To reproduce, simply fork the provided FileConfigProvider, and add a non-negative ttl to the ConfigData returned by the get functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7612) Fix javac warnings and enable warnings as errors
[ https://issues.apache.org/jira/browse/KAFKA-7612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-7612. Resolution: Fixed Fix Version/s: 2.2.0 > Fix javac warnings and enable warnings as errors > > > Key: KAFKA-7612 > URL: https://issues.apache.org/jira/browse/KAFKA-7612 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 2.2.0 > > > The only way to keep warnings away is to treat them as errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7605) Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`
[ https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-7605. Resolution: Fixed Fix Version/s: 2.2.0 > Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover` > --- > > Key: KAFKA-7605 > URL: https://issues.apache.org/jira/browse/KAFKA-7605 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.2.0 > > > {code} > java.lang.AssertionError: Failed to observe commit callback before timeout > at kafka.utils.TestUtils$.fail(TestUtils.scala:351) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:761) > at kafka.utils.TestUtils$.pollUntilTrue(TestUtils.scala:727) > at > kafka.api.BaseConsumerTest.awaitCommitCallback(BaseConsumerTest.scala:198) > at > kafka.api.BaseConsumerTest.ensureNoRebalance(BaseConsumerTest.scala:214) > at > kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:117) > {code} > Probably just need to increase the timeout a little. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed
[ https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684663#comment-16684663 ] Gardella Juan Pablo commented on KAFKA-6188: I have the same issue with the latest (2.0.1) kafka server on windows: {code:java} [2018-11-13 00:10:10,348] ERROR Error while renaming dir for __consumer_offsets-3 in log dir C:\tmp\kafka-logs3 (kafka.server.LogDirFailureChannel) java.nio.file.AccessDeniedException: C:\tmp\kafka-logs3\__consumer_offsets-3 -> C:\tmp\kafka-logs3\__consumer_offsets-3.a05956d903874d3f8ccca890cb5278b6-delete at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:786) at kafka.log.Log.$anonfun$renameDir$2(Log.scala:689) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.log.Log.maybeHandleIOException(Log.scala:1842) at kafka.log.Log.renameDir(Log.scala:687) at kafka.log.LogManager.asyncDelete(LogManager.scala:833) at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:271) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.cluster.Partition.delete(Partition.scala:265) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:341) at kafka.server.ReplicaManager.$anonfun$stopReplicas$2(ReplicaManager.scala:371) at scala.collection.Iterator.foreach(Iterator.scala:944) at scala.collection.Iterator.foreach$(Iterator.scala:944) at scala.collection.AbstractIterator.foreach(Iterator.scala:1432) at scala.collection.IterableLike.foreach(IterableLike.scala:71) at scala.collection.IterableLike.foreach$(IterableLike.scala:70) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:369) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:200) at kafka.server.KafkaApis.handle(KafkaApis.scala:111) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) Suppressed: java.nio.file.AccessDeniedException: C:\tmp\kafka-logs3\__consumer_offsets-3 -> C:\tmp\kafka-logs3\__consumer_offsets-3.a05956d903874d3f8ccca890cb5278b6-delete at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301) at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:783) ... 22 more{code} > Broker fails with FATAL Shutdown - log dirs have failed > --- > > Key: KAFKA-6188 > URL: https://issues.apache.org/jira/browse/KAFKA-6188 > Project: Kafka > Issue Type: Bug > Components: clients, log >Affects Versions: 1.0.0, 1.0.1 > Environment: Windows 10 >Reporter: Valentina Baljak >Assignee: Dong Lin >Priority: Blocker > Labels: windows > Attachments: Segments are opened before deletion, > kafka_2.10-0.10.2.1.zip, output.txt > > > Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The > test environment is very simple, with only one producer and one consumer. > Initially, everything started fine, stand alone tests worked as expected. > However, running my code, Kafka clients fail after approximately 10 minutes. > Kafka won't start after that and it fails with the same error. > Deleting logs helps to start again, and the same problem occurs. > Here is the error traceback: > [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 > ms. (kafka.log.LogManager) > [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of > 9223372036854775807 ms. (kafka.log.LogManager) > [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. > (kafka.network.Acceptor) > [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor > threads (kafka.network.SocketServer) > [2017-11-08 08:21:57,829] INFO
[jira] [Commented] (KAFKA-5649) Producer is being closed generating ssl exception
[ https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683420#comment-16683420 ] Behroz Sikander commented on KAFKA-5649: [~biplob] could you please share your experience on how you solved this issue? @others Has anyone found a solution to this ? > Producer is being closed generating ssl exception > - > > Key: KAFKA-5649 > URL: https://issues.apache.org/jira/browse/KAFKA-5649 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.1 > Environment: Spark 2.2.0 and kafka 0.10.2.0 >Reporter: Pablo Panero >Priority: Major > > On a streaming job using built-in kafka source and sink (over SSL), with I am > getting the following exception: > On a streaming job using built-in kafka source and sink (over SSL), with I > am getting the following exception: > Config of the source: > {code:java} > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("failOnDataLoss", value = false) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .option("subscribe", config.topicConfigList.keys.mkString(",")) > .load() > {code} > Config of the sink: > {code:java} > .writeStream > .option("checkpointLocation", > s"${config.checkpointDir}/${topicConfig._1}/") > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .start() > {code} > And in some cases it throws the exception making the spark job stuck in that > step. Exception stack trace is the following: > {code:java} > 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) > at org.apache.kafka.common.network.Selector.close(Selector.java:531) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) > at >
[jira] [Commented] (KAFKA-7587) Support for Protocol Buffers Message Format in kafka-console-consumer script
[ https://issues.apache.org/jira/browse/KAFKA-7587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684789#comment-16684789 ] huxihx commented on KAFKA-7587: --- Did you try ConsoleConsumer with `--property value.deserializer=`? > Support for Protocol Buffers Message Format in kafka-console-consumer script > > > Key: KAFKA-7587 > URL: https://issues.apache.org/jira/browse/KAFKA-7587 > Project: Kafka > Issue Type: Improvement > Components: clients, tools >Affects Versions: 2.0.0 >Reporter: Sakalya Deshpande >Priority: Major > > Currently there is no support for Protocol Buffers Message format in > kafka-console-consumer > so,it is not possible to see those messages from command prompt.It would be > nice to have command like this > ./kafka-console-consumer.sh --topic --serializer > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters
[ https://issues.apache.org/jira/browse/KAFKA-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683803#comment-16683803 ] Stanislav Kozlovski commented on KAFKA-7618: https://github.com/apache/kafka/pull/5905 > Trogdor - Fix /tasks endpoint parameters > > > Key: KAFKA-7618 > URL: https://issues.apache.org/jira/browse/KAFKA-7618 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It > supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, > `firstEndMs`, `lastEndMs`. > These arguments denote milliseconds since the unix epoch. > There is a bug currently where the endpoint parses the arguments as integers, > whereas they should be long (the current unix millisecond timestamp does not > fit into an integer). > This results in API calls returning a 404 > {code:java} > curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks > * Trying ::1... > * TCP_NODELAY set > * Connected to localhost (::1) port 8889 (#0) > > GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1 > > Host: localhost:8889 > > User-Agent: curl/7.54.0 > > Accept: */* > > > < HTTP/1.1 500 Internal Server Error > < Date: Mon, 12 Nov 2018 13:28:59 GMT > < Content-Type: application/json > < Content-Length: 43 > < Server: Jetty(9.4.12.v20180830) > < > * Connection #0 to host localhost left intact{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7617) Document security primitives
[ https://issues.apache.org/jira/browse/KAFKA-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683818#comment-16683818 ] ASF GitHub Bot commented on KAFKA-7617: --- viktorsomogyi opened a new pull request #5906: KAFKA-7617: Add authorization primitives to security page URL: https://github.com/apache/kafka/pull/5906 This is a security page improvement that adds documentation about Kafka authorization primitives to the security page. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document security primitives > > > Key: KAFKA-7617 > URL: https://issues.apache.org/jira/browse/KAFKA-7617 > Project: Kafka > Issue Type: Task >Reporter: Viktor Somogyi >Assignee: Viktor Somogyi >Priority: Minor > > Although the documentation gives help on configuring the authentication and > authorization, it won't list what are the security primitives (operations and > resources) that can be used which makes it hard for users to easily set up > thorough authorization rules. > This task would cover adding these to the security page of the Kafka > documentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7619) Trogdor - Allow filtering tasks by state in /coordinator/tasks endpoint
Stanislav Kozlovski created KAFKA-7619: -- Summary: Trogdor - Allow filtering tasks by state in /coordinator/tasks endpoint Key: KAFKA-7619 URL: https://issues.apache.org/jira/browse/KAFKA-7619 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, `firstEndMs`, `lastEndMs`. These arguments denote milliseconds since the unix epoch. It would be useful to support filtering by the state of the task. We currently have no way of getting every `RUNNING`, `STOPPED`, or `PENDING` task unless we want to manually filter through everything returned by `/coordinator/tasks`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683660#comment-16683660 ] Rajini Sivaram commented on KAFKA-7280: --- [~sachinu] 2.0.1 clients are compatible with 1.1.x brokers, so there will be no compatibility issue. Also, since message format hasn't changed between these versions, there shouldn't be a performance issue either. To go with recreating consumers when the issue occurs, it should be safe if the old consumer is closed before the new one is created. This does result in consumer rebalance, so there is a performance impact whenever this is done. > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) >
[jira] [Created] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters
Stanislav Kozlovski created KAFKA-7618: -- Summary: Trogdor - Fix /tasks endpoint parameters Key: KAFKA-7618 URL: https://issues.apache.org/jira/browse/KAFKA-7618 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, `firstEndMs`, `lastEndMs`. These arguments denote milliseconds since the unix epoch. There is a bug currently where the endpoint parses the arguments as integers, whereas they should be long (the current unix millisecond timestamp does not fit into an integer). This results in API calls returning a 404 {code:java} curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks * Trying ::1... * TCP_NODELAY set * Connected to localhost (::1) port 8889 (#0) > GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1 > Host: localhost:8889 > User-Agent: curl/7.54.0 > Accept: */* > < HTTP/1.1 500 Internal Server Error < Date: Mon, 12 Nov 2018 13:28:59 GMT < Content-Type: application/json < Content-Length: 43 < Server: Jetty(9.4.12.v20180830) < * Connection #0 to host localhost left intact{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7611) Eliminate compiler warnings in Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-7611: -- Assignee: (was: Ismael Juma) > Eliminate compiler warnings in Kafka > > > Key: KAFKA-7611 > URL: https://issues.apache.org/jira/browse/KAFKA-7611 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat updated KAFKA-4453: --- Issue Type: Improvement (was: Bug) > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Mayuresh Gharat >Priority: Major > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests\[2\], and data loss (for some > unofficial\[3\] definition of data loss in terms of messages beyond the high > watermark)\[4\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > \[3\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[4\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7615) Support different topic name in source and destination server in Mirrormaker
[ https://issues.apache.org/jira/browse/KAFKA-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684233#comment-16684233 ] Ryanne Dolan commented on KAFKA-7615: - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] (under discussion) supports this via Connect transformations (SMTs) > Support different topic name in source and destination server in Mirrormaker > > > Key: KAFKA-7615 > URL: https://issues.apache.org/jira/browse/KAFKA-7615 > Project: Kafka > Issue Type: New Feature > Components: mirrormaker >Reporter: Adeeti Kaushal >Priority: Minor > > Currently mirrormaker only supports same topic name in source and destination > broker. Support for different topic names in source and destination brokers > is needed. > > source broker : topic name -> topicA > destination broker: topic name -> topicB > > MirrorData from topicA to topicB -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value
[ https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684236#comment-16684236 ] ASF GitHub Bot commented on KAFKA-7528: --- stanislavkozlovski opened a new pull request #5908: KAFKA-7528: Standardize on Min/Avg/Max Kafka metrics' default value - NaN URL: https://github.com/apache/kafka/pull/5908 Note I haven't ran the whole test suite yet, bear with me until I fix test failures This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standardize on Min/Avg/Max Kafka metrics' default value > --- > > Key: KAFKA-7528 > URL: https://issues.apache.org/jira/browse/KAFKA-7528 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.2.0 > > > KIP-386: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit
[ https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684267#comment-16684267 ] ASF GitHub Bot commented on KAFKA-7518: --- ijuma closed pull request #5815: KAFKA-7518: FutureRecordMetadata.get deadline calculation fix URL: https://github.com/apache/kafka/pull/5815 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index a38bd04f906..e448d6ebdc3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Time; import java.util.ArrayDeque; import java.util.ArrayList; @@ -254,7 +255,8 @@ private void verifyNoTransactionInFlight() { partition = partition(record, this.cluster); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); ProduceRequestResult result = new ProduceRequestResult(topicPartition); -FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); +FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, +0L, 0, 0, Time.SYSTEM); long offset = nextOffset(topicPartition); Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 8fcc46ff3ff..d1a643b3196 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -16,13 +16,14 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Time; + import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.kafka.clients.producer.RecordMetadata; - /** * The future result of a record send */ @@ -34,16 +35,18 @@ private final Long checksum; private final int serializedKeySize; private final int serializedValueSize; +private final Time time; private volatile FutureRecordMetadata nextRecordMetadata = null; public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp, -Long checksum, int serializedKeySize, int serializedValueSize) { +Long checksum, int serializedKeySize, int serializedValueSize, Time time) { this.result = result; this.relativeOffset = relativeOffset; this.createTimestamp = createTimestamp; this.checksum = checksum; this.serializedKeySize = serializedKeySize; this.serializedValueSize = serializedValueSize; +this.time = time; } @Override @@ -67,13 +70,14 @@ public RecordMetadata get() throws InterruptedException, ExecutionException { @Override public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // Handle overflow. -long now = System.currentTimeMillis(); -long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout; +long now = time.milliseconds(); +long timeoutMillis = unit.toMillis(timeout); +long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE : now + timeoutMillis; boolean occurred = this.result.await(timeout, unit); -if (nextRecordMetadata != null) -return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (!occurred) -throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms."); +throw new TimeoutException("Timeout after waiting for " + timeoutMillis + " ms."); +if (nextRecordMetadata
[jira] [Created] (KAFKA-7617) Document security primitives
Viktor Somogyi created KAFKA-7617: - Summary: Document security primitives Key: KAFKA-7617 URL: https://issues.apache.org/jira/browse/KAFKA-7617 Project: Kafka Issue Type: Task Reporter: Viktor Somogyi Assignee: Viktor Somogyi Although the documentation gives help on configuring the authentication and authorization, it won't list what are the security primitives (operations and resources) that can be used which makes it hard for users to easily set up thorough authorization rules. This task would cover adding these to the security page of the Kafka documentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit
[ https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7518: --- Fix Version/s: 2.1.1 2.2.0 > FutureRecordMetadata.get deadline calculation from timeout is not using > timeunit > > > Key: KAFKA-7518 > URL: https://issues.apache.org/jira/browse/KAFKA-7518 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Andras Katona >Assignee: Andras Katona >Priority: Major > Fix For: 2.2.0, 2.1.1 > > > Code below assumes that timeout is in milliseconds when calculating deadline. > {code} > @Override > public RecordMetadata get(long timeout, TimeUnit unit) throws > InterruptedException, ExecutionException, TimeoutException { > // Handle overflow. > long now = System.currentTimeMillis(); > long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now > + timeout; > {code} > {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} > failed sometimes for me and it took me to this code segment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value
[ https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-7528: --- Summary: Standardize on Min/Avg/Max Kafka metrics' default value (was: Make Min and Max metrics' default value consistent with each other) > Standardize on Min/Avg/Max Kafka metrics' default value > --- > > Key: KAFKA-7528 > URL: https://issues.apache.org/jira/browse/KAFKA-7528 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.2.0 > > > KIP-386: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value
[ https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-7528: --- Description: KIP-386: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345] (was: KIP-386: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics]) > Standardize on Min/Avg/Max Kafka metrics' default value > --- > > Key: KAFKA-7528 > URL: https://issues.apache.org/jira/browse/KAFKA-7528 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.2.0 > > > KIP-386: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens
[ https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanxiang (Patrick) Huang resolved KAFKA-7452. -- Resolution: Duplicate KAFKA-7557 fixed this. > Deleting snapshot files after check-pointing log recovery offsets can slow > down replication when truncation happens > --- > > Key: KAFKA-7452 > URL: https://issues.apache.org/jira/browse/KAFKA-7452 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0 >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Major > > After KAFKA-5829, Kafka will try to iterate through all the partition dirs to > delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". > Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following > places: > # Truncation > # Log dir deletion and movement > # Background thread checkpointing recovery offsets > In 2.0 deployment on a cluster with 10k partitions per broker, we found out > that deleting useless snapshot files in the critical path of log truncation > can significantly slow down followers to catch up with leader during rolling > bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" > for the whole data directory only to potentially delete the snapshot files in > one partition directory because the way we identify snapshot files is to list > the directories and check the filename suffix. > In our case, "listSnapshotFiles" takes ~1ms per partition directory so it > takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after > the truncation, which delays future fetches in the fetcher thread. > Here are the related code snippets: > LogManager.scala > > {code:java} > private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { > for { > partitionToLog <- logsByDir.get(dir.getAbsolutePath) > checkpoint <- recoveryPointCheckpoints.get(dir) > } { > try { > checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) > allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) > } catch { > case e: IOException => > logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, > s"Disk error while writing to recovery point " + > s"file in directory $dir", e) > } > } > } > {code} > > ProducerStateChangeManager.scala > > {code:java} > private[log] def listSnapshotFiles(dir: File): Seq[File] = { > if (dir.exists && dir.isDirectory) { > Option(dir.listFiles).map { files => > files.filter(f => f.isFile && isSnapshotFile(f)).toSeq > }.getOrElse(Seq.empty) > } else Seq.empty > } > private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => > true) { > listSnapshotFiles(dir).filter(file => > predicate(offsetFromFile(file))).foreach { file => > Files.deleteIfExists(file.toPath) > } > } > {code} > > There are a few things that can be optimized here: > # We can have an in-memory cache for the snapshot files metadata (filename) > in ProducerStateManager to avoid calling dir.listFiles in > "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile". > # After truncation, we can only try to delete snapshot files for the > truncated partitions (in replica fetcher thread, we truncate one partition at > a time) instead of all partitions. Or maybe we don't even need to delete > snapshot files in the critical path of truncation because the background > log-recovery-offset-checkpoint-thread will do it periodically. This can also > apply on log deletion/movement. > # If we want to further optimize the actual snapshot file deletion, we can > make it async. But I am not sure whether it is needed after we have 1) and 2). > Also, we notice that there is no way to disable transaction/exactly-once > support in the broker-side given that it will bring in some extra overhead > even though we have no clients using this feature. Not sure whether this is a > common use case, but it is useful if we can have a switch to avoid the extra > performance overhead. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6958) Allow to define custom processor names with KStreams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684450#comment-16684450 ] ASF GitHub Bot commented on KAFKA-6958: --- fhussonnois opened a new pull request #5909: KAFKA-6958: Allow to define custom processor names with KStreams DSL URL: https://github.com/apache/kafka/pull/5909 This is a WIP for the KIP : https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow to define custom processor names with KStreams DSL > > > Key: KAFKA-6958 > URL: https://issues.apache.org/jira/browse/KAFKA-6958 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: kip > > Currently, while building a new Topology through the KStreams DSL the > processors are automatically named. > The genarated names are prefixed depending of the operation (i.e > KSTREAM-SOURCE, KSTREAM-FILTER, KSTREAM-MAP, etc). > To debug/understand a topology it is possible to display the processor > lineage with the method Topology#describe(). However, a complex topology with > dozens of operations can be hard to understand if the processor names are not > relevant. > It would be useful to be able to set more meaningful names. For example, a > processor name could describe the business rule performed by a map() > operation. > [KIP-307|https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL] -- This message was sent by Atlassian JIRA (v7.6.3#76005)